Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,13 @@ const (
)

type PendingTxChecker func() PendingTxCheckerResponse
type ExpireTxHandler func()

// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// ResponseCheckTxV2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// to utilize the new fields in V2 type (but still be backwards-compatible)
type ResponseCheckTxV2 struct {
*ResponseCheckTx
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
ExpireTxHandler ExpireTxHandler
}
20 changes: 14 additions & 6 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,20 @@ func (txmp *TxMempool) CheckTx(
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
expiredCallback: func(removeFromCache bool) {
if removeFromCache {
txmp.cache.Remove(tx)
}
if res.ExpireTxHandler != nil {
res.ExpireTxHandler()
}
},
}

if err == nil {
// only add new transaction if checkTx passes and is not pending
if !res.IsPendingTransaction {
err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo)

if err != nil {
return err
}
Expand Down Expand Up @@ -837,9 +844,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {

atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))

if removeFromCache {
txmp.cache.Remove(wtx.tx)
}
wtx.expiredCallback(removeFromCache)
}

// purgeExpiredTxs removes all transactions that have exceeded their respective
Expand Down Expand Up @@ -888,10 +893,13 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}

for _, wtx := range expiredTxs {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
}

txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, txmp.cache.Remove)
// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) {
wtx.expiredCallback(!txmp.config.KeepInvalidTxsInCache)
})
}

func (txmp *TxMempool) notifyTxsAvailable() {
Expand Down
30 changes: 18 additions & 12 deletions internal/mempool/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type WrappedTx struct {
// transaction in the mempool can be evicted when it is simultaneously having
// a reCheckTx callback executed.
removed bool

// this is the callback that can be called when a transaction expires
expiredCallback func(removeFromCache bool)
}

func (wtx *WrappedTx) Size() int {
Expand Down Expand Up @@ -295,10 +298,10 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) {

type PendingTxs struct {
mtx *sync.RWMutex
txs []PendingTxInfo
txs []TxWithResponse
}

type PendingTxInfo struct {
type TxWithResponse struct {
tx *WrappedTx
checkTxResponse *abci.ResponseCheckTxV2
txInfo TxInfo
Expand All @@ -307,13 +310,12 @@ type PendingTxInfo struct {
func NewPendingTxs() *PendingTxs {
return &PendingTxs{
mtx: &sync.RWMutex{},
txs: []PendingTxInfo{},
txs: []TxWithResponse{},
}
}

func (p *PendingTxs) EvaluatePendingTransactions() (
acceptedTxs []PendingTxInfo,
rejectedTxs []PendingTxInfo,
acceptedTxs []TxWithResponse,
rejectedTxs []TxWithResponse,
) {
poppedIndices := []int{}
p.mtx.Lock()
Expand All @@ -337,7 +339,7 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) {
if len(indices) == 0 {
return
}
newTxs := []PendingTxInfo{}
newTxs := []TxWithResponse{}
start := 0
for _, idx := range indices {
newTxs = append(newTxs, p.txs[start:idx]...)
Expand All @@ -350,14 +352,14 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) {
func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.txs = append(p.txs, PendingTxInfo{
p.txs = append(p.txs, TxWithResponse{
tx: tx,
checkTxResponse: resCheckTx,
txInfo: txInfo,
})
}

func (p *PendingTxs) Peek(max int) []PendingTxInfo {
func (p *PendingTxs) Peek(max int) []TxWithResponse {
p.mtx.RLock()
defer p.mtx.RUnlock()
// priority is fifo
Expand All @@ -373,7 +375,7 @@ func (p *PendingTxs) Size() int {
return len(p.txs)
}

func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(types.Tx)) {
func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(wtx *WrappedTx)) {
p.mtx.Lock()
defer p.mtx.Unlock()

Expand All @@ -385,10 +387,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat
if ttlNumBlock > 0 {
idxFirstNotExpiredTx := len(p.txs)
for i, ptx := range p.txs {
// once found, we can break because these are ordered
if (blockHeight - ptx.tx.height) <= ttlNumBlock {
idxFirstNotExpiredTx = i
break
} else {
cb(ptx.tx.tx)
cb(ptx.tx)
}
}
p.txs = p.txs[idxFirstNotExpiredTx:]
Expand All @@ -401,10 +405,12 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat
if ttlDuration > 0 {
idxFirstNotExpiredTx := len(p.txs)
for i, ptx := range p.txs {
// once found, we can break because these are ordered
if now.Sub(ptx.tx.timestamp) <= ttlDuration {
idxFirstNotExpiredTx = i
break
} else {
cb(ptx.tx.tx)
cb(ptx.tx)
}
}
p.txs = p.txs[idxFirstNotExpiredTx:]
Expand Down