Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,8 @@ class CNode
// If true, we will send him all quorum related messages, even if he is not a member of our quorums
std::atomic<bool> qwatch{false};

std::set<uint256> orphan_work_set;

CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false);
~CNode();

Expand Down
149 changes: 89 additions & 60 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,8 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphansSize)
return nEvicted;
}

void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans);

// Requires cs_main.
void Misbehaving(NodeId pnode, int howmuch)
{
Expand Down Expand Up @@ -843,13 +845,23 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &schedu
}

void PeerLogicValidation::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex, const std::vector<CTransactionRef>& vtxConflicted) {
LOCK(g_cs_orphans);
LOCK2(cs_main, g_cs_orphans);

std::vector<uint256> vOrphanErase;
std::set<uint256> orphanWorkSet;

for (const CTransactionRef& ptx : pblock->vtx) {
const CTransaction& tx = *ptx;

// Which orphan pool entries we should reprocess and potentially try to accept into mempool again?
for (size_t i = 0; i < tx.vin.size(); i++) {
auto itByPrev = mapOrphanTransactionsByPrev.find(COutPoint(tx.GetHash(), (uint32_t)i));
if (itByPrev == mapOrphanTransactionsByPrev.end()) continue;
for (const auto& elem : itByPrev->second) {
orphanWorkSet.insert(elem->first);
}
}

// Which orphan pool entries must we evict?
for (const auto& txin : tx.vin) {
auto itByPrev = mapOrphanTransactionsByPrev.find(txin.prevout);
Expand All @@ -871,6 +883,11 @@ void PeerLogicValidation::BlockConnected(const std::shared_ptr<const CBlock>& pb
LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx included or conflicted by block\n", nErased);
}

while (!orphanWorkSet.empty()) {
LogPrint(BCLog::MEMPOOL, "Trying to process %d orphans\n", orphanWorkSet.size());
ProcessOrphanTx(g_connman.get(), orphanWorkSet);
}

g_last_tip_update = GetTime();
}

Expand Down Expand Up @@ -1649,6 +1666,64 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
return true;
}

void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
{
AssertLockHeld(cs_main);
AssertLockHeld(g_cs_orphans);
std::set<NodeId> setMisbehaving;
bool done = false;
while (!done && !orphan_work_set.empty()) {
const uint256 orphanHash = *orphan_work_set.begin();
orphan_work_set.erase(orphan_work_set.begin());

auto orphan_it = mapOrphanTransactions.find(orphanHash);
if (orphan_it == mapOrphanTransactions.end()) continue;

const CTransactionRef porphanTx = orphan_it->second.tx;
const CTransaction& orphanTx = *porphanTx;
NodeId fromPeer = orphan_it->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;

if (setMisbehaving.count(fromPeer)) continue;
if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
connman->RelayTransaction(orphanTx);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
orphan_work_set.insert(elem->first);
}
}
}
EraseOrphanTx(orphanHash);
done = true;
} else if (!fMissingInputs2) {
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0) {
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
if (!stateDummy.CorruptionPossible()) {
assert(recentRejects);
recentRejects->insert(orphanHash);
}
EraseOrphanTx(orphanHash);
done = true;
}
mempool.check(pcoinsTip);
}
}

bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
{
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId());
Expand Down Expand Up @@ -2341,8 +2416,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}

std::deque<COutPoint> vWorkQueue;
std::vector<uint256> vEraseQueue;
CTransactionRef ptx;
CPrivateSendBroadcastTx dstx;
int nInvType = MSG_TX;
Expand Down Expand Up @@ -2418,7 +2491,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.check(pcoinsTip);
connman->RelayTransaction(tx);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
vWorkQueue.emplace_back(inv.hash, i);
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
pfrom->orphan_work_set.insert(elem->first);
}
}
}

pfrom->nLastTXTime = GetTime();
Expand All @@ -2429,62 +2507,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.size(), mempool.DynamicMemoryUsage() / 1000);

// Recursively process any orphan transactions that depended on this one
std::set<NodeId> setMisbehaving;
while (!vWorkQueue.empty()) {
auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front());
vWorkQueue.pop_front();
if (itByPrev == mapOrphanTransactionsByPrev.end())
continue;
for (auto mi = itByPrev->second.begin();
mi != itByPrev->second.end();
++mi)
{
const CTransactionRef& porphanTx = (*mi)->second.tx;
const CTransaction& orphanTx = *porphanTx;
const uint256& orphanHash = orphanTx.GetHash();
NodeId fromPeer = (*mi)->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;


if (setMisbehaving.count(fromPeer))
continue;
if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
connman->RelayTransaction(orphanTx);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
vWorkQueue.emplace_back(orphanHash, i);
}
vEraseQueue.push_back(orphanHash);
}
else if (!fMissingInputs2)
{
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0)
{
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
vEraseQueue.push_back(orphanHash);
if (!stateDummy.CorruptionPossible()) {
assert(recentRejects);
recentRejects->insert(orphanHash);
}
}
mempool.check(pcoinsTip);
}
}

for (uint256 hash : vEraseQueue)
EraseOrphanTx(hash);
ProcessOrphanTx(connman, pfrom->orphan_work_set);
}
else if (fMissingInputs)
{
Expand Down Expand Up @@ -3203,11 +3226,17 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);

if (!pfrom->orphan_work_set.empty()) {
LOCK2(cs_main, g_cs_orphans);
ProcessOrphanTx(connman, pfrom->orphan_work_set);
}

if (pfrom->fDisconnect)
return false;

// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return true;
if (!pfrom->orphan_work_set.empty()) return true;

// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend)
Expand Down
24 changes: 11 additions & 13 deletions src/privatesend/privatesend-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,17 @@ void CPrivateSendClientManager::ProcessMessage(CNode* pfrom, const std::string&
if (q == dsq) {
return;
}
if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) {
// no way the same mn can send another dsq with the same readiness this soon
LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- Peer %s is sending WAY too many dsq messages for a masternode with collateral %s\n", pfrom->GetLogString(), dsq.masternodeOutpoint.ToStringShort());
return;
}
}
} // cs_vecqueue

LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- %s new\n", dsq.ToString());

if (dsq.IsExpired()) return;
if (dsq.IsTimeOutOfBounds()) return;

auto mnList = deterministicMNManager->GetListAtChainTip();
auto dmn = mnList.GetValidMNByCollateral(dsq.masternodeOutpoint);
Expand All @@ -83,18 +88,6 @@ void CPrivateSendClientManager::ProcessMessage(CNode* pfrom, const std::string&
}
}
} else {
LOCK(cs_deqsessions); // have to lock this first to avoid deadlocks with cs_vecqueue
TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return;

for (const auto& q : vecPrivateSendQueue) {
if (q.masternodeOutpoint == dsq.masternodeOutpoint) {
// no way same mn can send another "not yet ready" dsq this soon
LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- Masternode %s is sending WAY too many dsq messages\n", dmn->pdmnState->ToString());
return;
}
}

int64_t nLastDsq = mmetaman.GetMetaInfo(dmn->proTxHash)->GetLastDsq();
int nThreshold = nLastDsq + mnList.GetValidMNsCount() / 5;
LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- nLastDsq: %d threshold: %d nDsqCount: %d\n", nLastDsq, nThreshold, mmetaman.GetDsqCount());
Expand All @@ -107,12 +100,17 @@ void CPrivateSendClientManager::ProcessMessage(CNode* pfrom, const std::string&
mmetaman.AllowMixing(dmn->proTxHash);

LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- new PrivateSend queue (%s) from masternode %s\n", dsq.ToString(), dmn->pdmnState->addr.ToString());

LOCK(cs_deqsessions);
for (auto& session : deqSessions) {
CDeterministicMNCPtr mnMixing;
if (session.GetMixingMasternodeInfo(mnMixing) && mnMixing->collateralOutpoint == dsq.masternodeOutpoint) {
dsq.fTried = true;
}
}

TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return;
vecPrivateSendQueue.push_back(dsq);
dsq.Relay(connman);
}
Expand Down
39 changes: 21 additions & 18 deletions src/privatesend/privatesend-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ void CPrivateSendServer::ProcessMessage(CNode* pfrom, const std::string& strComm
}

} else if (strCommand == NetMsgType::DSQUEUE) {
TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return;

if (pfrom->nVersion < MIN_PRIVATESEND_PEER_PROTO_VERSION) {
LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- peer=%d using obsolete version %i\n", pfrom->GetId(), pfrom->nVersion);
connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE, strprintf("Version must be %d or greater", MIN_PRIVATESEND_PEER_PROTO_VERSION)));
Expand All @@ -109,16 +106,26 @@ void CPrivateSendServer::ProcessMessage(CNode* pfrom, const std::string& strComm
CPrivateSendQueue dsq;
vRecv >> dsq;

// process every dsq only once
for (const auto& q : vecPrivateSendQueue) {
if (q == dsq) {
return;
{
TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return;

// process every dsq only once
for (const auto& q : vecPrivateSendQueue) {
if (q == dsq) {
return;
}
if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) {
// no way the same mn can send another dsq with the same readiness this soon
LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- Peer %s is sending WAY too many dsq messages for a masternode with collateral %s\n", pfrom->GetLogString(), dsq.masternodeOutpoint.ToStringShort());
return;
}
}
}
} // cs_vecqueue

LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- %s new\n", dsq.ToString());

if (dsq.IsExpired()) return;
if (dsq.IsTimeOutOfBounds()) return;

auto mnList = deterministicMNManager->GetListAtChainTip();
auto dmn = mnList.GetValidMNByCollateral(dsq.masternodeOutpoint);
Expand All @@ -131,14 +138,6 @@ void CPrivateSendServer::ProcessMessage(CNode* pfrom, const std::string& strComm
}

if (!dsq.fReady) {
for (const auto& q : vecPrivateSendQueue) {
if (q.masternodeOutpoint == dsq.masternodeOutpoint) {
// no way same mn can send another "not yet ready" dsq this soon
LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- Masternode %s is sending WAY too many dsq messages\n", dmn->pdmnState->addr.ToString());
return;
}
}

int64_t nLastDsq = mmetaman.GetMetaInfo(dmn->proTxHash)->GetLastDsq();
int nThreshold = nLastDsq + mnList.GetValidMNsCount() / 5;
LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- nLastDsq: %d threshold: %d nDsqCount: %d\n", nLastDsq, nThreshold, mmetaman.GetDsqCount());
Expand All @@ -150,6 +149,9 @@ void CPrivateSendServer::ProcessMessage(CNode* pfrom, const std::string& strComm
mmetaman.AllowMixing(dmn->proTxHash);

LogPrint(BCLog::PRIVATESEND, "DSQUEUE -- new PrivateSend queue (%s) from masternode %s\n", dsq.ToString(), dmn->pdmnState->addr.ToString());

TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return;
vecPrivateSendQueue.push_back(dsq);
dsq.Relay(connman);
}
Expand Down Expand Up @@ -524,10 +526,11 @@ void CPrivateSendServer::ConsumeCollateral(CConnman& connman, const CTransaction
void CPrivateSendServer::CheckTimeout(CConnman& connman)
{
if (!fMasternodeMode) return;
if (nState == POOL_STATE_IDLE) return;

CheckQueue();

if (nState == POOL_STATE_IDLE) return;

int nTimeout = (nState == POOL_STATE_SIGNING) ? PRIVATESEND_SIGNING_TIMEOUT : PRIVATESEND_QUEUE_TIMEOUT;
bool fTimeout = GetTime() - nTimeLastSuccessfulStep >= nTimeout;

Expand Down
11 changes: 8 additions & 3 deletions src/privatesend/privatesend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ bool CPrivateSendQueue::Relay(CConnman& connman)
return true;
}

bool CPrivateSendQueue::IsTimeOutOfBounds() const
{
return GetAdjustedTime() - nTime > PRIVATESEND_QUEUE_TIMEOUT || nTime - GetAdjustedTime() > PRIVATESEND_QUEUE_TIMEOUT;
}

uint256 CPrivateSendBroadcastTx::GetSignatureHash() const
{
return SerializeHash(*this);
Expand Down Expand Up @@ -173,8 +178,8 @@ void CPrivateSendBaseManager::CheckQueue()
// check mixing queue objects for timeouts
auto it = vecPrivateSendQueue.begin();
while (it != vecPrivateSendQueue.end()) {
if ((*it).IsExpired()) {
LogPrint(BCLog::PRIVATESEND, "CPrivateSendBaseManager::%s -- Removing expired queue (%s)\n", __func__, (*it).ToString());
if ((*it).IsTimeOutOfBounds()) {
LogPrint(BCLog::PRIVATESEND, "CPrivateSendBaseManager::%s -- Removing a queue (%s)\n", __func__, (*it).ToString());
it = vecPrivateSendQueue.erase(it);
} else {
++it;
Expand All @@ -189,7 +194,7 @@ bool CPrivateSendBaseManager::GetQueueItemAndTry(CPrivateSendQueue& dsqRet)

for (auto& dsq : vecPrivateSendQueue) {
// only try each queue once
if (dsq.fTried || dsq.IsExpired()) continue;
if (dsq.fTried || dsq.IsTimeOutOfBounds()) continue;
dsq.fTried = true;
dsqRet = dsq;
return true;
Expand Down
Loading