From e9ffd7aa1c65708fa48f949cb04105f7371645fa Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 9 Oct 2019 10:12:33 +0200 Subject: [PATCH 1/3] Also handle/resolve orphan TXs when parents appear in a block --- src/net.h | 2 + src/net_processing.cpp | 149 +++++++++++++++++----------- test/functional/invalidtxrequest.py | 36 ++++++- 3 files changed, 124 insertions(+), 63 deletions(-) diff --git a/src/net.h b/src/net.h index 2032a3aefe..5b2cbd658d 100755 --- a/src/net.h +++ b/src/net.h @@ -884,6 +884,8 @@ class CNode // If true, we will send him all quorum related messages, even if he is not a member of our quorums std::atomic qwatch{false}; + std::set orphan_work_set; + CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false); ~CNode(); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 456e33365b..6cca6d6c3a 100755 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -774,6 +774,8 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphansSize) return nEvicted; } +void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans); + // Requires cs_main. void Misbehaving(NodeId pnode, int howmuch) { @@ -843,13 +845,23 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &schedu } void PeerLogicValidation::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex, const std::vector& vtxConflicted) { - LOCK(g_cs_orphans); + LOCK2(cs_main, g_cs_orphans); std::vector vOrphanErase; + std::set orphanWorkSet; for (const CTransactionRef& ptx : pblock->vtx) { const CTransaction& tx = *ptx; + // Which orphan pool entries we should reprocess and potentially try to accept into mempool again? + for (size_t i = 0; i < tx.vin.size(); i++) { + auto itByPrev = mapOrphanTransactionsByPrev.find(COutPoint(tx.GetHash(), (uint32_t)i)); + if (itByPrev == mapOrphanTransactionsByPrev.end()) continue; + for (const auto& elem : itByPrev->second) { + orphanWorkSet.insert(elem->first); + } + } + // Which orphan pool entries must we evict? for (const auto& txin : tx.vin) { auto itByPrev = mapOrphanTransactionsByPrev.find(txin.prevout); @@ -871,6 +883,11 @@ void PeerLogicValidation::BlockConnected(const std::shared_ptr& pb LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx included or conflicted by block\n", nErased); } + while (!orphanWorkSet.empty()) { + LogPrint(BCLog::MEMPOOL, "Trying to process %d orphans\n", orphanWorkSet.size()); + ProcessOrphanTx(g_connman.get(), orphanWorkSet); + } + g_last_tip_update = GetTime(); } @@ -1649,6 +1666,64 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve return true; } +void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) +{ + AssertLockHeld(cs_main); + AssertLockHeld(g_cs_orphans); + std::set setMisbehaving; + bool done = false; + while (!done && !orphan_work_set.empty()) { + const uint256 orphanHash = *orphan_work_set.begin(); + orphan_work_set.erase(orphan_work_set.begin()); + + auto orphan_it = mapOrphanTransactions.find(orphanHash); + if (orphan_it == mapOrphanTransactions.end()) continue; + + const CTransactionRef porphanTx = orphan_it->second.tx; + const CTransaction& orphanTx = *porphanTx; + NodeId fromPeer = orphan_it->second.fromPeer; + bool fMissingInputs2 = false; + // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan + // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get + // anyone relaying LegitTxX banned) + CValidationState stateDummy; + + if (setMisbehaving.count(fromPeer)) continue; + if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) { + LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); + connman->RelayTransaction(orphanTx); + for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + orphan_work_set.insert(elem->first); + } + } + } + EraseOrphanTx(orphanHash); + done = true; + } else if (!fMissingInputs2) { + int nDos = 0; + if (stateDummy.IsInvalid(nDos) && nDos > 0) { + // Punish peer that gave us an invalid orphan tx + Misbehaving(fromPeer, nDos); + setMisbehaving.insert(fromPeer); + LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); + } + // Has inputs but not accepted to mempool + // Probably non-standard or insufficient fee + LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); + if (!stateDummy.CorruptionPossible()) { + assert(recentRejects); + recentRejects->insert(orphanHash); + } + EraseOrphanTx(orphanHash); + done = true; + } + mempool.check(pcoinsTip); + } +} + bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic& interruptMsgProc) { LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId()); @@ -2341,8 +2416,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - std::deque vWorkQueue; - std::vector vEraseQueue; CTransactionRef ptx; CPrivateSendBroadcastTx dstx; int nInvType = MSG_TX; @@ -2418,7 +2491,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.check(pcoinsTip); connman->RelayTransaction(tx); for (unsigned int i = 0; i < tx.vout.size(); i++) { - vWorkQueue.emplace_back(inv.hash, i); + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + pfrom->orphan_work_set.insert(elem->first); + } + } } pfrom->nLastTXTime = GetTime(); @@ -2429,62 +2507,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.size(), mempool.DynamicMemoryUsage() / 1000); // Recursively process any orphan transactions that depended on this one - std::set setMisbehaving; - while (!vWorkQueue.empty()) { - auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front()); - vWorkQueue.pop_front(); - if (itByPrev == mapOrphanTransactionsByPrev.end()) - continue; - for (auto mi = itByPrev->second.begin(); - mi != itByPrev->second.end(); - ++mi) - { - const CTransactionRef& porphanTx = (*mi)->second.tx; - const CTransaction& orphanTx = *porphanTx; - const uint256& orphanHash = orphanTx.GetHash(); - NodeId fromPeer = (*mi)->second.fromPeer; - bool fMissingInputs2 = false; - // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan - // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get - // anyone relaying LegitTxX banned) - CValidationState stateDummy; - - - if (setMisbehaving.count(fromPeer)) - continue; - if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) { - LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); - connman->RelayTransaction(orphanTx); - for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { - vWorkQueue.emplace_back(orphanHash, i); - } - vEraseQueue.push_back(orphanHash); - } - else if (!fMissingInputs2) - { - int nDos = 0; - if (stateDummy.IsInvalid(nDos) && nDos > 0) - { - // Punish peer that gave us an invalid orphan tx - Misbehaving(fromPeer, nDos); - setMisbehaving.insert(fromPeer); - LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); - } - // Has inputs but not accepted to mempool - // Probably non-standard or insufficient fee - LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); - vEraseQueue.push_back(orphanHash); - if (!stateDummy.CorruptionPossible()) { - assert(recentRejects); - recentRejects->insert(orphanHash); - } - } - mempool.check(pcoinsTip); - } - } - - for (uint256 hash : vEraseQueue) - EraseOrphanTx(hash); + ProcessOrphanTx(connman, pfrom->orphan_work_set); } else if (fMissingInputs) { @@ -3203,11 +3226,17 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic& inter if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + if (!pfrom->orphan_work_set.empty()) { + LOCK2(cs_main, g_cs_orphans); + ProcessOrphanTx(connman, pfrom->orphan_work_set); + } + if (pfrom->fDisconnect) return false; // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return true; + if (!pfrom->orphan_work_set.empty()) return true; // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) diff --git a/test/functional/invalidtxrequest.py b/test/functional/invalidtxrequest.py index 8d0e4f10e6..5792af2abe 100755 --- a/test/functional/invalidtxrequest.py +++ b/test/functional/invalidtxrequest.py @@ -62,7 +62,16 @@ def run_test(self): # Save the coinbase for later block1 = block tip = block.sha256 - node.p2p.send_blocks_and_test([block], node, success=True) + + # Create a second one to test orphan resolution via block receival + height += 1 + block_time += 1 + block = create_block(tip, create_coinbase(height), block_time) + block.solve() + # Save the coinbase for later + block2 = block + tip = block.sha256 + node.p2p.send_blocks_and_test([block1, block2], node, success=True) self.log.info("Mature the block.") self.nodes[0].generate(100) @@ -80,10 +89,18 @@ def run_test(self): self.reconnect_p2p(num_connections=2) self.log.info('Test orphan transaction handling ... ') + self.test_orphan_tx_handling(block1.vtx[0].sha256, False) + node.generate(1) # Clear mempool + self.reconnect_p2p(num_connections=2) + self.test_orphan_tx_handling(block2.vtx[0].sha256, True) + + def test_orphan_tx_handling(self, base_tx, resolve_via_block): + node = self.nodes[0] # convenience reference to the node + # Create a root transaction that we withold until all dependend transactions # are sent out and in the orphan cache tx_withhold = CTransaction() - tx_withhold.vin.append(CTxIn(outpoint=COutPoint(block1.vtx[0].sha256, 0))) + tx_withhold.vin.append(CTxIn(outpoint=COutPoint(base_tx, 0))) tx_withhold.vout.append(CTxOut(nValue=50 * COIN - 12000, scriptPubKey=b'\x51')) tx_withhold.calc_sha256() @@ -119,7 +136,17 @@ def run_test(self): assert_equal(2, len(node.getpeerinfo())) # p2ps[1] is still connected self.log.info('Send the withhold tx ... ') - node.p2p.send_txs_and_test([tx_withhold], node, success=True) + if resolve_via_block: + # Test orphan handling/resolution by publishing the withhold TX via a mined block + prev_block = node.getblockheader(node.getbestblockhash()) + block = create_block(int(prev_block['hash'], 16), create_coinbase(prev_block['height'] + 1), prev_block["time"] + 1) + block.vtx.append(tx_withhold) + block.hashMerkleRoot = block.calc_merkle_root() + block.solve() + node.p2p.send_blocks_and_test([block], node, success=True) + else: + # Test orphan handling/resolution by publishing the withhold TX via the mempool + node.p2p.send_txs_and_test([tx_withhold], node, success=True) # Transactions that should end up in the mempool expected_mempool = { @@ -133,6 +160,9 @@ def run_test(self): # Transactions that do not end up in the mempool # tx_orphan_no_fee, because it has too low fee (p2ps[0] is not disconnected for relaying that tx) # tx_orphan_invaid, because it has negative fee (p2ps[1] is disconnected for relaying that tx) + if resolve_via_block: + # This TX has appeared in a block instead of being broadcasted via the mempool + expected_mempool.remove(tx_withhold.hash) wait_until(lambda: 1 == len(node.getpeerinfo()), timeout=12) # p2ps[1] is no longer connected assert_equal(expected_mempool, set(node.getrawmempool())) From b8d7966f9a6688f4fdbadfb0a8d2557c9dec372c Mon Sep 17 00:00:00 2001 From: -k Date: Thu, 19 Dec 2019 08:46:40 -0800 Subject: [PATCH 2/3] Fixes and refactorings related to using mnsync in tests (#3136) * Drop `get_mnsync_status`, `wait_to_sync` and `sync_masternodes` and introduce `force_finish_mnsync` for MNs only * Use `force_finish_mnsync` from util.py in dip3-deterministicmns.py and drop local unused functions Also move the call, `force_finish_mnsync` should be called before `connect_nodes_bi` --- test/functional/dip3-deterministicmns.py | 21 +-------------- test/functional/invalidblockrequest.py | 3 +-- test/functional/p2p-fullblocktest.py | 1 - test/functional/sporks.py | 3 --- .../test_framework/test_framework.py | 8 ++---- test/functional/test_framework/util.py | 26 +++++++------------ 6 files changed, 13 insertions(+), 49 deletions(-) diff --git a/test/functional/dip3-deterministicmns.py b/test/functional/dip3-deterministicmns.py index b3d20b9e78..5b36d0c313 100755 --- a/test/functional/dip3-deterministicmns.py +++ b/test/functional/dip3-deterministicmns.py @@ -243,12 +243,12 @@ def start_mn(self, mn): self.add_nodes(1) extra_args = ['-masternode=1', '-masternodeblsprivkey=%s' % mn.blsMnkey] self.start_node(mn.idx, extra_args = self.extra_args + extra_args) + force_finish_mnsync(self.nodes[mn.idx]) for i in range(0, len(self.nodes)): if i < len(self.nodes) and self.nodes[i] is not None and self.nodes[i].process is not None and i != mn.idx: connect_nodes_bi(self.nodes, mn.idx, i) mn.node = self.nodes[mn.idx] self.sync_all() - self.force_finish_mnsync(mn.node) def spend_mn_collateral(self, mn, with_dummy_input_output=False): return self.spend_input(mn.collateral_txid, mn.collateral_vout, 1000, with_dummy_input_output) @@ -276,25 +276,6 @@ def test_protx_update_service(self, mn): self.nodes[0].protx('update_service', mn.protx_hash, '127.0.0.1:%d' % mn.p2p_port, mn.blsMnkey, "", mn.fundsAddr) self.nodes[0].generate(1) - def force_finish_mnsync(self, node): - while True: - s = node.mnsync('next') - if s == 'sync updated to MASTERNODE_SYNC_FINISHED': - break - time.sleep(0.1) - - def force_finish_mnsync_list(self, node): - if node.mnsync('status')['AssetName'] == 'MASTERNODE_SYNC_WAITING': - node.mnsync('next') - - while True: - mnlist = node.masternode('list', 'status') - if len(mnlist) != 0: - time.sleep(0.5) - self.force_finish_mnsync(node) - return - time.sleep(0.1) - def assert_mnlists(self, mns): for node in self.nodes: self.assert_mnlist(node, mns) diff --git a/test/functional/invalidblockrequest.py b/test/functional/invalidblockrequest.py index 6d943dc8ac..9ff016a513 100755 --- a/test/functional/invalidblockrequest.py +++ b/test/functional/invalidblockrequest.py @@ -15,7 +15,7 @@ from test_framework.blocktools import create_block, create_coinbase, create_transaction, network_thread_start from test_framework.mininode import P2PDataStore, COIN from test_framework.test_framework import BitcoinTestFramework -from test_framework.util import assert_equal, sync_masternodes +from test_framework.util import assert_equal class InvalidBlockRequestTest(BitcoinTestFramework): @@ -30,7 +30,6 @@ def run_test(self): node.add_p2p_connection(P2PDataStore()) network_thread_start() - sync_masternodes(self.nodes, True) node.p2p.wait_for_verack() best_block = node.getblock(node.getbestblockhash()) diff --git a/test/functional/p2p-fullblocktest.py b/test/functional/p2p-fullblocktest.py index 3d4993b461..aef3793ab7 100755 --- a/test/functional/p2p-fullblocktest.py +++ b/test/functional/p2p-fullblocktest.py @@ -71,7 +71,6 @@ def run_test(self): self.test = TestManager(self, self.options.tmpdir) self.test.add_all_connections(self.nodes) network_thread_start() - sync_masternodes(self.nodes, True) self.test.run() def add_transactions_to_block(self, block, tx_list): diff --git a/test/functional/sporks.py b/test/functional/sporks.py index 10c2ad1135..1484d5022b 100755 --- a/test/functional/sporks.py +++ b/test/functional/sporks.py @@ -63,9 +63,6 @@ def run_test(self): assert(self.get_test_spork_state(self.nodes[0])) assert(self.get_test_spork_state(self.nodes[1])) - # Force finish mnsync node as otherwise it will never send out headers to other peers - wait_to_sync(self.nodes[1], fast_mnsync=True) - # Generate one block to kick off masternode sync, which also starts sporks syncing for node2 self.nodes[1].generate(1) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 71cbdb2a97..42f3ab226a 100644 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -30,6 +30,7 @@ connect_nodes, copy_datadir, disconnect_nodes, + force_finish_mnsync, initialize_datadir, log_filename, p2p_port, @@ -37,8 +38,6 @@ satoshi_round, sync_blocks, sync_mempools, - sync_masternodes, - wait_to_sync, wait_until, ) @@ -582,7 +581,7 @@ def do_start(idx): self.start_node(idx + start_idx, extra_args=args) self.mninfo[idx].nodeIdx = idx + start_idx self.mninfo[idx].node = self.nodes[idx + start_idx] - wait_to_sync(self.mninfo[idx].node, True) + force_finish_mnsync(self.mninfo[idx].node) def do_connect(idx): for i in range(0, idx + 1): @@ -608,8 +607,6 @@ def do_connect(idx): job.result() jobs.clear() - sync_masternodes(self.nodes, True) - executor.shutdown() def setup_network(self): @@ -626,7 +623,6 @@ def setup_network(self): self.log.info("Creating and starting %s simple nodes", num_simple_nodes) for i in range(0, num_simple_nodes): self.create_simple_node() - sync_masternodes(self.nodes, True) self.log.info("Activating DIP3") if not self.fast_dip3_enforcement: diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py index f789dc8455..d0891a4a3d 100644 --- a/test/functional/test_framework/util.py +++ b/test/functional/test_framework/util.py @@ -263,20 +263,6 @@ def get_rpc_proxy(url, node_number, timeout=None, coveragedir=None): return coverage.AuthServiceProxyWrapper(proxy, coverage_logfile) -def get_mnsync_status(node): - result = node.mnsync("status") - return result['IsSynced'] - -def wait_to_sync(node, fast_mnsync=False): - while True: - synced = get_mnsync_status(node) - if synced: - break - time.sleep(0.2) - if fast_mnsync: - # skip mnsync states - node.mnsync("next") - def p2p_port(n): assert(n <= MAX_NODES) return PORT_MIN + n + (MAX_NODES * PortSeed.n) % (PORT_RANGE - 1 - MAX_NODES) @@ -449,9 +435,15 @@ def sync_mempools(rpc_connections, *, wait=1, timeout=60): timeout -= wait raise AssertionError("Mempool sync failed") -def sync_masternodes(rpc_connections, fast_mnsync=False): - for node in rpc_connections: - wait_to_sync(node, fast_mnsync) +def force_finish_mnsync(node): + """ + Masternodes won't accept incoming connections while IsSynced is false. + Force them to switch to this state to speed things up. + """ + while True: + if node.mnsync("status")['IsSynced']: + break + node.mnsync("next") # Transaction/Block functions ############################# From 7a234189a723b058c3e4f5ba8daa5d78b1d2bcef Mon Sep 17 00:00:00 2001 From: -k Date: Thu, 19 Dec 2019 08:47:50 -0800 Subject: [PATCH 3/3] Various fixes for mixing queues (#3138) * Always check for expired queues on masternodes * Check if a queue is too old or too far into the future Instead of only checking that it's to old * Check that no masternode can spam us with dsqs regardless of dsq readiness --- src/privatesend/privatesend-client.cpp | 24 ++++++++-------- src/privatesend/privatesend-server.cpp | 39 ++++++++++++++------------ src/privatesend/privatesend.cpp | 11 ++++++-- src/privatesend/privatesend.h | 4 +-- 4 files changed, 42 insertions(+), 36 deletions(-) diff --git a/src/privatesend/privatesend-client.cpp b/src/privatesend/privatesend-client.cpp index d9ba93d6be..8b7d235b0f 100755 --- a/src/privatesend/privatesend-client.cpp +++ b/src/privatesend/privatesend-client.cpp @@ -54,12 +54,17 @@ void CPrivateSendClientManager::ProcessMessage(CNode* pfrom, const std::string& if (q == dsq) { return; } + if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) { + // no way the same mn can send another dsq with the same readiness this soon + LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- Peer %s is sending WAY too many dsq messages for a masternode with collateral %s\n", pfrom->GetLogString(), dsq.masternodeOutpoint.ToStringShort()); + return; + } } } // cs_vecqueue LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- %s new\n", dsq.ToString()); - if (dsq.IsExpired()) return; + if (dsq.IsTimeOutOfBounds()) return; auto mnList = deterministicMNManager->GetListAtChainTip(); auto dmn = mnList.GetValidMNByCollateral(dsq.masternodeOutpoint); @@ -83,18 +88,6 @@ void CPrivateSendClientManager::ProcessMessage(CNode* pfrom, const std::string& } } } else { - LOCK(cs_deqsessions); // have to lock this first to avoid deadlocks with cs_vecqueue - TRY_LOCK(cs_vecqueue, lockRecv); - if (!lockRecv) return; - - for (const auto& q : vecPrivateSendQueue) { - if (q.masternodeOutpoint == dsq.masternodeOutpoint) { - // no way same mn can send another "not yet ready" dsq this soon - LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- Masternode %s is sending WAY too many dsq messages\n", dmn->pdmnState->ToString()); - return; - } - } - int64_t nLastDsq = mmetaman.GetMetaInfo(dmn->proTxHash)->GetLastDsq(); int nThreshold = nLastDsq + mnList.GetValidMNsCount() / 5; LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- nLastDsq: %d threshold: %d nDsqCount: %d\n", nLastDsq, nThreshold, mmetaman.GetDsqCount()); @@ -107,12 +100,17 @@ void CPrivateSendClientManager::ProcessMessage(CNode* pfrom, const std::string& mmetaman.AllowMixing(dmn->proTxHash); LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- new PrivateSend queue (%s) from masternode %s\n", dsq.ToString(), dmn->pdmnState->addr.ToString()); + + LOCK(cs_deqsessions); for (auto& session : deqSessions) { CDeterministicMNCPtr mnMixing; if (session.GetMixingMasternodeInfo(mnMixing) && mnMixing->collateralOutpoint == dsq.masternodeOutpoint) { dsq.fTried = true; } } + + TRY_LOCK(cs_vecqueue, lockRecv); + if (!lockRecv) return; vecPrivateSendQueue.push_back(dsq); dsq.Relay(connman); } diff --git a/src/privatesend/privatesend-server.cpp b/src/privatesend/privatesend-server.cpp index 1c7720a981..a33088940b 100755 --- a/src/privatesend/privatesend-server.cpp +++ b/src/privatesend/privatesend-server.cpp @@ -97,9 +97,6 @@ void CPrivateSendServer::ProcessMessage(CNode* pfrom, const std::string& strComm } } else if (strCommand == NetMsgType::DSQUEUE) { - TRY_LOCK(cs_vecqueue, lockRecv); - if (!lockRecv) return; - if (pfrom->nVersion < MIN_PRIVATESEND_PEER_PROTO_VERSION) { LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- peer=%d using obsolete version %i\n", pfrom->GetId(), pfrom->nVersion); connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE, strprintf("Version must be %d or greater", MIN_PRIVATESEND_PEER_PROTO_VERSION))); @@ -109,16 +106,26 @@ void CPrivateSendServer::ProcessMessage(CNode* pfrom, const std::string& strComm CPrivateSendQueue dsq; vRecv >> dsq; - // process every dsq only once - for (const auto& q : vecPrivateSendQueue) { - if (q == dsq) { - return; + { + TRY_LOCK(cs_vecqueue, lockRecv); + if (!lockRecv) return; + + // process every dsq only once + for (const auto& q : vecPrivateSendQueue) { + if (q == dsq) { + return; + } + if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) { + // no way the same mn can send another dsq with the same readiness this soon + LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- Peer %s is sending WAY too many dsq messages for a masternode with collateral %s\n", pfrom->GetLogString(), dsq.masternodeOutpoint.ToStringShort()); + return; + } } - } + } // cs_vecqueue LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- %s new\n", dsq.ToString()); - if (dsq.IsExpired()) return; + if (dsq.IsTimeOutOfBounds()) return; auto mnList = deterministicMNManager->GetListAtChainTip(); auto dmn = mnList.GetValidMNByCollateral(dsq.masternodeOutpoint); @@ -131,14 +138,6 @@ void CPrivateSendServer::ProcessMessage(CNode* pfrom, const std::string& strComm } if (!dsq.fReady) { - for (const auto& q : vecPrivateSendQueue) { - if (q.masternodeOutpoint == dsq.masternodeOutpoint) { - // no way same mn can send another "not yet ready" dsq this soon - LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- Masternode %s is sending WAY too many dsq messages\n", dmn->pdmnState->addr.ToString()); - return; - } - } - int64_t nLastDsq = mmetaman.GetMetaInfo(dmn->proTxHash)->GetLastDsq(); int nThreshold = nLastDsq + mnList.GetValidMNsCount() / 5; LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- nLastDsq: %d threshold: %d nDsqCount: %d\n", nLastDsq, nThreshold, mmetaman.GetDsqCount()); @@ -150,6 +149,9 @@ void CPrivateSendServer::ProcessMessage(CNode* pfrom, const std::string& strComm mmetaman.AllowMixing(dmn->proTxHash); LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- new PrivateSend queue (%s) from masternode %s\n", dsq.ToString(), dmn->pdmnState->addr.ToString()); + + TRY_LOCK(cs_vecqueue, lockRecv); + if (!lockRecv) return; vecPrivateSendQueue.push_back(dsq); dsq.Relay(connman); } @@ -524,10 +526,11 @@ void CPrivateSendServer::ConsumeCollateral(CConnman& connman, const CTransaction void CPrivateSendServer::CheckTimeout(CConnman& connman) { if (!fMasternodeMode) return; - if (nState == POOL_STATE_IDLE) return; CheckQueue(); + if (nState == POOL_STATE_IDLE) return; + int nTimeout = (nState == POOL_STATE_SIGNING) ? PRIVATESEND_SIGNING_TIMEOUT : PRIVATESEND_QUEUE_TIMEOUT; bool fTimeout = GetTime() - nTimeLastSuccessfulStep >= nTimeout; diff --git a/src/privatesend/privatesend.cpp b/src/privatesend/privatesend.cpp index 254baa9723..2a94380f32 100755 --- a/src/privatesend/privatesend.cpp +++ b/src/privatesend/privatesend.cpp @@ -82,6 +82,11 @@ bool CPrivateSendQueue::Relay(CConnman& connman) return true; } +bool CPrivateSendQueue::IsTimeOutOfBounds() const +{ + return GetAdjustedTime() - nTime > PRIVATESEND_QUEUE_TIMEOUT || nTime - GetAdjustedTime() > PRIVATESEND_QUEUE_TIMEOUT; +} + uint256 CPrivateSendBroadcastTx::GetSignatureHash() const { return SerializeHash(*this); @@ -173,8 +178,8 @@ void CPrivateSendBaseManager::CheckQueue() // check mixing queue objects for timeouts auto it = vecPrivateSendQueue.begin(); while (it != vecPrivateSendQueue.end()) { - if ((*it).IsExpired()) { - LogPrint(BCLog::PRIVATESEND, "CPrivateSendBaseManager::%s -- Removing expired queue (%s)\n", __func__, (*it).ToString()); + if ((*it).IsTimeOutOfBounds()) { + LogPrint(BCLog::PRIVATESEND, "CPrivateSendBaseManager::%s -- Removing a queue (%s)\n", __func__, (*it).ToString()); it = vecPrivateSendQueue.erase(it); } else { ++it; @@ -189,7 +194,7 @@ bool CPrivateSendBaseManager::GetQueueItemAndTry(CPrivateSendQueue& dsqRet) for (auto& dsq : vecPrivateSendQueue) { // only try each queue once - if (dsq.fTried || dsq.IsExpired()) continue; + if (dsq.fTried || dsq.IsTimeOutOfBounds()) continue; dsq.fTried = true; dsqRet = dsq; return true; diff --git a/src/privatesend/privatesend.h b/src/privatesend/privatesend.h index ca589e83a5..894f91155d 100755 --- a/src/privatesend/privatesend.h +++ b/src/privatesend/privatesend.h @@ -271,8 +271,8 @@ class CPrivateSendQueue bool Relay(CConnman& connman); - /// Is this queue expired? - bool IsExpired() { return GetAdjustedTime() - nTime > PRIVATESEND_QUEUE_TIMEOUT; } + /// Check if a queue is too old or too far into the future + bool IsTimeOutOfBounds() const; std::string ToString() const {