Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (cli *grpcClient) Info(ctx context.Context, params *types.RequestInfo) (*ty
return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
resCheckTx, err := cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
return &types.ResponseCheckTxV2{ResponseCheckTx: resCheckTx}, err
}

func (cli *grpcClient) Query(ctx context.Context, params *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
8 changes: 4 additions & 4 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*typ
return res.GetInfo(), nil
}

func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
res, err := cli.doRequest(ctx, types.ToRequestCheckTx(req))
if err != nil {
return nil, err
}
return res.GetCheckTx(), nil
return &types.ResponseCheckTxV2{ResponseCheckTx: res.GetCheckTx()}, nil
}

func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinal
return &types.ResponseFinalizeBlock{TxResults: respTxs, ValidatorUpdates: app.ValUpdates, AppHash: appHash}, nil
}

func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil
func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
return &types.ResponseCheckTxV2{ResponseCheckTx: &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}}, nil
}

func (app *Application) Commit(_ context.Context) (*types.ResponseCommit, error) {
Expand Down
8 changes: 8 additions & 0 deletions abci/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,11 @@ func (app *gRPCApplication) Flush(_ context.Context, req *types.RequestFlush) (*
func (app *gRPCApplication) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx)
}

func (app *gRPCApplication) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
resV2, err := app.Application.CheckTx(ctx, req)
if err != nil {
return &types.ResponseCheckTx{}, err
}
return resV2.ResponseCheckTx, nil
}
6 changes: 3 additions & 3 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Application interface {
Query(context.Context, *RequestQuery) (*ResponseQuery, error) // Query for state

// Mempool Connection
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTx, error) // Validate a tx for the mempool
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTxV2, error) // Validate a tx for the mempool

// Consensus Connection
InitChain(context.Context, *RequestInitChain) (*ResponseInitChain, error) // Initialize blockchain w validators/other info from TendermintCore
Expand Down Expand Up @@ -51,8 +51,8 @@ func (BaseApplication) Info(_ context.Context, req *RequestInfo) (*ResponseInfo,
return &ResponseInfo{}, nil
}

func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
return &ResponseCheckTx{Code: CodeTypeOK}, nil
func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTxV2, error) {
return &ResponseCheckTxV2{ResponseCheckTx: &ResponseCheckTx{Code: CodeTypeOK}}, nil
}

func (BaseApplication) Commit(_ context.Context) (*ResponseCommit, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func ToResponseInfo(res *ResponseInfo) *Response {
}
}

func ToResponseCheckTx(res *ResponseCheckTx) *Response {
func ToResponseCheckTx(res *ResponseCheckTxV2) *Response {
return &Response{
Value: &Response_CheckTx{res},
Value: &Response_CheckTx{res.ResponseCheckTx},
}
}

Expand Down
8 changes: 4 additions & 4 deletions abci/types/mocks/application.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,21 @@ func MarshalTxResults(r []*ExecTxResult) ([][]byte, error) {
}
return s, nil
}

type PendingTxCheckerResponse int

const (
Accepted PendingTxCheckerResponse = iota
Rejected
Pending
)

type PendingTxChecker func() PendingTxCheckerResponse

// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// to utilize the new fields in V2 type (but still be backwards-compatible)
type ResponseCheckTxV2 struct {
*ResponseCheckTx
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
}
8 changes: 4 additions & 4 deletions internal/consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,18 +308,18 @@ func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.Reques
return res, nil
}

func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
app.mu.Lock()
defer app.mu.Unlock()

txValue := txAsUint64(req.Tx)
if txValue != uint64(app.mempoolTxCount) {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
}, nil
}}, nil
}
app.mempoolTxCount++
return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeOK}}, nil
}

func txAsUint64(tx []byte) uint64 {
Expand Down
45 changes: 37 additions & 8 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type TxMempool struct {
// index. i.e. older transactions are first.
timestampIndex *WrappedTxList

// pendingTxs stores transactions that are not valid yet but might become valid
// if its checker returns Accepted
pendingTxs *PendingTxs

// A read/write lock is used to safe guard updates, insertions and deletions
// from the mempool. A read-lock is implicitly acquired when executing CheckTx,
// however, a caller must explicitly grab a write-lock via Lock when updating
Expand Down Expand Up @@ -120,6 +124,7 @@ func NewTxMempool(
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
}),
pendingTxs: NewPendingTxs(),
failedCheckTxCounts: map[types.NodeID]uint64{},
peerManager: peerManager,
}
Expand Down Expand Up @@ -286,17 +291,25 @@ func (txmp *TxMempool) CheckTx(
height: txmp.height,
}

// only add new transaction if checkTx passes
if err == nil {
err = txmp.addNewTransaction(wtx, res, txInfo)
// only add new transaction if checkTx passes and is not pending
if !res.IsPendingTransaction {
err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo)

if err != nil {
return err
if err != nil {
return err
}
} else {
// otherwise add to pending txs store
if res.Checker == nil {
return errors.New("no checker available for pending transaction")
}
txmp.pendingTxs.Insert(wtx, res, txInfo)
}
}

if cb != nil {
cb(res)
cb(res.ResponseCheckTx)
}

return nil
Expand Down Expand Up @@ -470,6 +483,7 @@ func (txmp *TxMempool) Update(
}
}

txmp.handlePendingTransactions()
txmp.purgeExpiredTxs(blockHeight)

// If there any uncommitted transactions left in the mempool, we either
Expand Down Expand Up @@ -633,7 +647,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
//
// This method is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTx) {
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTxV2) {
if txmp.recheckCursor == nil {
return
}
Expand Down Expand Up @@ -676,10 +690,11 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
if !txmp.txStore.IsTxRemoved(wtx.hash) {
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, res)
err = txmp.postCheck(tx, res.ResponseCheckTx)
}

if res.Code == abci.CodeTypeOK && err == nil {
// we will treat a transaction that turns pending in a recheck as invalid and evict it
if res.Code == abci.CodeTypeOK && err == nil && !res.IsPendingTransaction {
wtx.priority = res.Priority
} else {
txmp.logger.Debug(
Expand Down Expand Up @@ -904,3 +919,17 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string
jsonData, _ := json.Marshal(logs)
return string(jsonData)
}

func (txmp *TxMempool) handlePendingTransactions() {
accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions()
for _, tx := range accepted {
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil {
txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err))
}
}
if !txmp.config.KeepInvalidTxsInCache {
for _, tx := range rejected {
txmp.cache.Remove(tx.tx.tx)
}
}
}
16 changes: 8 additions & 8 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type testTx struct {
priority int64
}

func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
var (
priority int64
sender string
Expand All @@ -48,29 +48,29 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
if len(parts) == 3 {
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 100,
GasWanted: 1,
}, nil
}}, nil
}

priority = v
sender = string(parts[0])
} else {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 101,
GasWanted: 1,
}, nil
}}, nil
}

return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Sender: sender,
Code: code.CodeTypeOK,
GasWanted: 1,
}, nil
}}, nil
}

func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool {
Expand Down Expand Up @@ -727,4 +727,4 @@ func TestAppendCheckTxErr(t *testing.T) {
require.NoError(t, err)
require.Equal(t, len(data), 1)
require.Equal(t, data[0]["log"], "sample error msg")
}
}
Loading