diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 19300d911..b47f69d29 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -362,8 +362,8 @@ func (txmp *TxMempool) Flush() { // - Transactions returned are not removed from the mempool transaction // store or indexes. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() var ( totalGas int64 @@ -417,8 +417,8 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { // - Transactions returned are not removed from the mempool transaction // store or indexes. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() numTxs := txmp.priorityIndex.NumTxs() if max < 0 { diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 487f00654..76e89c056 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -304,27 +304,43 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) } + var wg sync.WaitGroup + // reap by gas capacity only - reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, 50) + wg.Add(1) + go func() { + defer wg.Done() + reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 50) + }() // reap by transaction bytes only - reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.GreaterOrEqual(t, len(reapedTxs), 16) + wg.Add(1) + go func() { + defer wg.Done() + reapedTxs := txmp.ReapMaxBytesMaxGas(1000, -1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.GreaterOrEqual(t, len(reapedTxs), 16) + }() // Reap by both transaction bytes and gas, where the size yields 31 reaped // transactions and the gas limit reaps 25 transactions. - reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, 25) + wg.Add(1) + go func() { + defer wg.Done() + reapedTxs := txmp.ReapMaxBytesMaxGas(1500, 30) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 25) + }() + + wg.Wait() } func TestTxMempool_ReapMaxTxs(t *testing.T) { @@ -363,26 +379,42 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) } + var wg sync.WaitGroup + // reap all transactions - reapedTxs := txmp.ReapMaxTxs(-1) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, len(tTxs)) + wg.Add(1) + go func() { + defer wg.Done() + reapedTxs := txmp.ReapMaxTxs(-1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, len(tTxs)) + }() // reap a single transaction - reapedTxs = txmp.ReapMaxTxs(1) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, 1) + wg.Add(1) + go func() { + defer wg.Done() + reapedTxs := txmp.ReapMaxTxs(1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 1) + }() // reap half of the transactions - reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, len(tTxs)/2) + wg.Add(1) + go func() { + defer wg.Done() + reapedTxs := txmp.ReapMaxTxs(len(tTxs) / 2) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, len(tTxs)/2) + }() + + wg.Wait() } func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {