@@ -64,6 +64,21 @@ static constexpr int STALE_RELAY_AGE_LIMIT = 30 * 24 * 60 * 60;
6464// / Age after which a block is considered historical for purposes of rate
6565// / limiting block relay. Set to one week, denominated in seconds.
6666static constexpr int HISTORICAL_BLOCK_AGE = 7 * 24 * 60 * 60 ;
67+ /* * Maximum number of in-flight transactions from a peer */
68+ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100 ;
69+ /* * Maximum number of announced transactions from a peer */
70+ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
71+ /* * How many microseconds to delay requesting transactions from inbound peers */
72+ static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000 ;
73+ /* * How long to wait (in microseconds) before downloading a transaction from an additional peer */
74+ static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000 ;
75+ /* * Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
76+ static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000 ;
77+ static_assert (INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
78+ " To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY" );
79+ /* * Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
80+ static const unsigned int MAX_GETDATA_SZ = 1000 ;
81+
6782
6883struct COrphanTx {
6984 // When modifying, adapt the copy of this definition in tests/DoS_tests.
@@ -274,6 +289,66 @@ struct CNodeState {
274289 // ! Time of last new block announcement
275290 int64_t m_last_block_announcement;
276291
292+ /*
293+ * State associated with transaction download.
294+ *
295+ * Tx download algorithm:
296+ *
297+ * When inv comes in, queue up (process_time, txid) inside the peer's
298+ * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
299+ * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
300+ *
301+ * The process_time for a transaction is set to nNow for outbound peers,
302+ * nNow + 2 seconds for inbound peers. This is the time at which we'll
303+ * consider trying to request the transaction from the peer in
304+ * SendMessages(). The delay for inbound peers is to allow outbound peers
305+ * a chance to announce before we request from inbound peers, to prevent
306+ * an adversary from using inbound connections to blind us to a
307+ * transaction (InvBlock).
308+ *
309+ * When we call SendMessages() for a given peer,
310+ * we will loop over the transactions in m_tx_process_time, looking
311+ * at the transactions whose process_time <= nNow. We'll request each
312+ * such transaction that we don't have already and that hasn't been
313+ * requested from another peer recently, up until we hit the
314+ * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
315+ * g_already_asked_for for each requested txid, storing the time of the
316+ * GETDATA request. We use g_already_asked_for to coordinate transaction
317+ * requests amongst our peers.
318+ *
319+ * For transactions that we still need but we have already recently
320+ * requested from some other peer, we'll reinsert (process_time, txid)
321+ * back into the peer's m_tx_process_time at the point in the future at
322+ * which the most recent GETDATA request would time out (ie
323+ * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
324+ * We add an additional delay for inbound peers, again to prefer
325+ * attempting download from outbound peers first.
326+ * We also add an extra small random delay up to 2 seconds
327+ * to avoid biasing some peers over others. (e.g., due to fixed ordering
328+ * of peer processing in ThreadMessageHandler).
329+ *
330+ * When we receive a transaction from a peer, we remove the txid from the
331+ * peer's m_tx_in_flight set and from their recently announced set
332+ * (m_tx_announced). We also clear g_already_asked_for for that entry, so
333+ * that if somehow the transaction is not accepted but also not added to
334+ * the reject filter, then we will eventually redownload from other
335+ * peers.
336+ */
337+ struct TxDownloadState {
338+ /* Track when to attempt download of announced transactions (process
339+ * time in micros -> txid)
340+ */
341+ std::multimap<int64_t , uint256> m_tx_process_time;
342+
343+ // ! Store all the transactions a peer has recently announced
344+ std::set<uint256> m_tx_announced;
345+
346+ // ! Store transactions which were requested by us
347+ std::set<uint256> m_tx_in_flight;
348+ };
349+
350+ TxDownloadState m_tx_download;
351+
277352 CNodeState (CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) {
278353 fCurrentlyConnected = false ;
279354 nMisbehavior = 0 ;
@@ -301,6 +376,9 @@ struct CNodeState {
301376 }
302377};
303378
379+ // Keeps track of the time (in microseconds) when transactions were requested last time
380+ limitedmap<uint256, int64_t > g_already_asked_for GUARDED_BY (cs_main)(MAX_INV_SZ);
381+
304382/* * Map maintaining per-node state. */
305383static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY (cs_main);
306384
@@ -591,6 +669,58 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec
591669 }
592670}
593671
672+ void EraseTxRequest (const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
673+ {
674+ g_already_asked_for.erase (txid);
675+ }
676+
677+ int64_t GetTxRequestTime (const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
678+ {
679+ auto it = g_already_asked_for.find (txid);
680+ if (it != g_already_asked_for.end ()) {
681+ return it->second ;
682+ }
683+ return 0 ;
684+ }
685+
686+ void UpdateTxRequestTime (const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
687+ {
688+ auto it = g_already_asked_for.find (txid);
689+ if (it == g_already_asked_for.end ()) {
690+ g_already_asked_for.insert (std::make_pair (txid, request_time));
691+ } else {
692+ g_already_asked_for.update (it, request_time);
693+ }
694+ }
695+
696+
697+ void RequestTx (CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
698+ {
699+ CNodeState::TxDownloadState& peer_download_state = state->m_tx_download ;
700+ if (peer_download_state.m_tx_announced .size () >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced .count (txid)) {
701+ // Too many queued announcements from this peer, or we already have
702+ // this announcement
703+ return ;
704+ }
705+ peer_download_state.m_tx_announced .insert (txid);
706+
707+ int64_t process_time;
708+ int64_t last_request_time = GetTxRequestTime (txid);
709+ // First time requesting this tx
710+ if (last_request_time == 0 ) {
711+ process_time = nNow;
712+ } else {
713+ // Randomize the delay to avoid biasing some peers over others (such as due to
714+ // fixed ordering of peer processing in ThreadMessageHandler)
715+ process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand (MAX_GETDATA_RANDOM_DELAY);
716+ }
717+
718+ // We delay processing announcements from non-preferred (eg inbound) peers
719+ if (!state->fPreferredDownload ) process_time += INBOUND_PEER_TX_DELAY;
720+
721+ peer_download_state.m_tx_process_time .emplace (process_time, txid);
722+ }
723+
594724} // namespace
595725
596726// This function is used for testing the stale tip eviction logic, see
@@ -1945,6 +2075,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
19452075 LOCK (cs_main);
19462076
19472077 uint32_t nFetchFlags = GetFetchFlags (pfrom);
2078+ int64_t nNow = GetTimeMicros ();
19482079
19492080 for (CInv &inv : vInv)
19502081 {
@@ -1976,7 +2107,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
19762107 if (fBlocksOnly ) {
19772108 LogPrint (BCLog::NET, " transaction (%s) inv sent in violation of protocol peer=%d\n " , inv.hash .ToString (), pfrom->GetId ());
19782109 } else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload ()) {
1979- pfrom->AskFor ( inv);
2110+ RequestTx ( State ( pfrom->GetId ()), inv. hash , nNow );
19802111 }
19812112 }
19822113 }
@@ -2211,8 +2342,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
22112342 bool fMissingInputs = false ;
22122343 CValidationState state;
22132344
2214- pfrom->setAskFor .erase (inv.hash );
2215- mapAlreadyAskedFor.erase (inv.hash );
2345+ CNodeState* nodestate = State (pfrom->GetId ());
2346+ nodestate->m_tx_download .m_tx_announced .erase (inv.hash );
2347+ nodestate->m_tx_download .m_tx_in_flight .erase (inv.hash );
2348+ EraseTxRequest (inv.hash );
22162349
22172350 std::list<CTransactionRef> lRemovedTxn;
22182351
@@ -2303,10 +2436,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
23032436 }
23042437 if (!fRejectedParents ) {
23052438 uint32_t nFetchFlags = GetFetchFlags (pfrom);
2439+ int64_t nNow = GetTimeMicros ();
2440+
23062441 for (const CTxIn& txin : tx.vin ) {
23072442 CInv _inv (MSG_TX | nFetchFlags, txin.prevout .hash );
23082443 pfrom->AddInventoryKnown (_inv);
2309- if (!AlreadyHave (_inv)) pfrom->AskFor ( _inv);
2444+ if (!AlreadyHave (_inv)) RequestTx ( State ( pfrom->GetId ()), _inv. hash , nNow );
23102445 }
23112446 AddOrphanTx (ptx, pfrom->GetId ());
23122447
@@ -3731,24 +3866,39 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
37313866 //
37323867 // Message: getdata (non-blocks)
37333868 //
3734- while (!pto->mapAskFor .empty () && (*pto->mapAskFor .begin ()).first <= nNow)
3735- {
3736- const CInv& inv = (*pto->mapAskFor .begin ()).second ;
3737- if (!AlreadyHave (inv))
3738- {
3739- LogPrint (BCLog::NET, " Requesting %s peer=%d\n " , inv.ToString (), pto->GetId ());
3740- vGetData.push_back (inv);
3741- if (vGetData.size () >= 1000 )
3742- {
3743- connman->PushMessage (pto, msgMaker.Make (NetMsgType::GETDATA, vGetData));
3744- vGetData.clear ();
3869+ auto & tx_process_time = state.m_tx_download .m_tx_process_time ;
3870+ while (!tx_process_time.empty () && tx_process_time.begin ()->first <= nNow && state.m_tx_download .m_tx_in_flight .size () < MAX_PEER_TX_IN_FLIGHT) {
3871+ const uint256& txid = tx_process_time.begin ()->second ;
3872+ CInv inv (MSG_TX | GetFetchFlags (pto), txid);
3873+ if (!AlreadyHave (inv)) {
3874+ // If this transaction was last requested more than 1 minute ago,
3875+ // then request.
3876+ int64_t last_request_time = GetTxRequestTime (inv.hash );
3877+ if (last_request_time <= nNow - GETDATA_TX_INTERVAL) {
3878+ LogPrint (BCLog::NET, " Requesting %s peer=%d\n " , inv.ToString (), pto->GetId ());
3879+ vGetData.push_back (inv);
3880+ if (vGetData.size () >= MAX_GETDATA_SZ) {
3881+ connman->PushMessage (pto, msgMaker.Make (NetMsgType::GETDATA, vGetData));
3882+ vGetData.clear ();
3883+ }
3884+ UpdateTxRequestTime (inv.hash , nNow);
3885+ state.m_tx_download .m_tx_in_flight .insert (inv.hash );
3886+ } else {
3887+ // This transaction is in flight from someone else; queue
3888+ // up processing to happen after the download times out
3889+ // (with a slight delay for inbound peers, to prefer
3890+ // requests to outbound peers).
3891+ RequestTx (&state, txid, nNow);
37453892 }
37463893 } else {
3747- // If we're not going to ask, don't expect a response.
3748- pto->setAskFor .erase (inv.hash );
3894+ // We have already seen this transaction, no need to download.
3895+ state.m_tx_download .m_tx_announced .erase (inv.hash );
3896+ state.m_tx_download .m_tx_in_flight .erase (inv.hash );
37493897 }
3750- pto-> mapAskFor .erase (pto-> mapAskFor .begin ());
3898+ tx_process_time .erase (tx_process_time .begin ());
37513899 }
3900+
3901+
37523902 if (!vGetData.empty ())
37533903 connman->PushMessage (pto, msgMaker.Make (NetMsgType::GETDATA, vGetData));
37543904
0 commit comments