diff --git a/app/app.go b/app/app.go index 1b7c7bdd83..5fe4bd6075 100644 --- a/app/app.go +++ b/app/app.go @@ -1057,6 +1057,14 @@ func (app *App) ProcessTxConcurrent( metrics.IncrTxProcessTypeCounter(metrics.CONCURRENT) } +type ProcessBlockConcurrentFunction func( + ctx sdk.Context, + txs [][]byte, + completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping, + blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping, + txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping, +) ([]*abci.ExecTxResult, bool) + func (app *App) ProcessBlockConcurrent( ctx sdk.Context, txs [][]byte, @@ -1121,9 +1129,44 @@ func (app *App) ProcessBlockConcurrent( return txResults, ok } +func (app *App) ProcessTxs( + ctx sdk.Context, + txs [][]byte, + dependencyDag *acltypes.Dag, + processBlockConcurrentFunction ProcessBlockConcurrentFunction, +) []*abci.ExecTxResult { + // Only run concurrently if no error + // Branch off the current context and pass a cached context to the concurrent delivered TXs that are shared. + // runTx will write to this ephermeral CacheMultiStore, after the process block is done, Write() is called on this + // CacheMultiStore where it writes the data to the parent store (DeliverState) in sorted Key order to maintain + // deterministic ordering between validators in the case of concurrent deliverTXs + processBlockCtx, processBlockCache := app.CacheContext(ctx) + concurrentResults, ok := processBlockConcurrentFunction( + processBlockCtx, + txs, + dependencyDag.CompletionSignalingMap, + dependencyDag.BlockingSignalsMap, + dependencyDag.TxMsgAccessOpMapping, + ) + if ok { + // Write the results back to the concurrent contexts - if concurrent execution fails, + // this should not be called and the state is rolled back and retried with synchronous execution + processBlockCache.Write() + return concurrentResults + } + + ctx.Logger().Error("Concurrent Execution failed, retrying with Synchronous") + // Clear the memcache context from the previous state as it failed, we no longer need to commit the data + ctx.ContextMemCache().Clear() + + txResults := app.ProcessBlockSynchronous(ctx, txs) + processBlockCache.Write() + return txResults +} + func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequest, lastCommit abci.CommitInfo) ([]abci.Event, []*abci.ExecTxResult, abci.ResponseEndBlock, error) { goCtx := app.decorateContextWithDexMemState(ctx.Context()) - ctx = ctx.WithContext(goCtx).WithContextMemCache(sdk.NewContextMemCache()) + ctx = ctx.WithContext(goCtx) events := []abci.Event{} beginBlockReq := abci.RequestBeginBlock{ @@ -1151,48 +1194,15 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ beginBlockResp := app.BeginBlock(ctx, beginBlockReq) events = append(events, beginBlockResp.Events...) - // typedTxs := []sdk.Tx{} - // for _, tx := range req.GetTxs() { - // typedTx, err := app.txDecoder(tx) - // if err != nil { - // typedTxs = append(typedTxs, nil) - // } else { - // typedTxs = append(typedTxs, typedTx) - // } - // } - // app.batchVerifier.VerifyTxs(ctx, typedTxs) - dependencyDag, err := app.AccessControlKeeper.BuildDependencyDag(ctx, app.txDecoder, app.GetAnteDepGenerator(), txs) var txResults []*abci.ExecTxResult switch err { case nil: - // Only run concurrently if no error - // Branch off the current context and pass a cached context to the concurrent delivered TXs that are shared. - // runTx will write to this ephermeral CacheMultiStore, after the process block is done, Write() is called on this - // CacheMultiStore where it writes the data to the parent store (DeliverState) in sorted Key order to maintain - // deterministic ordering between validators in the case of concurrent deliverTXs - processBlockCtx, processBlockCache := app.CacheContext(ctx) - concurrentResults, ok := app.ProcessBlockConcurrent( - processBlockCtx, - txs, - dependencyDag.CompletionSignalingMap, - dependencyDag.BlockingSignalsMap, - dependencyDag.TxMsgAccessOpMapping, - ) - if ok { - txResults = concurrentResults - // Write the results back to the concurrent contexts - if concurrent execution fails, - // this should not be called and the state is rolled back and retried with synchronous execution - processBlockCache.Write() - } else { - ctx.Logger().Error("Concurrent Execution failed, retrying with Synchronous") - txResults = app.ProcessBlockSynchronous(ctx, txs) - } - - // Write the results back to the concurrent contexts - processBlockCache.Write() + // Start with a fresh state for the MemCache + ctx = ctx.WithContextMemCache(sdk.NewContextMemCache()) + txResults = app.ProcessTxs(ctx, txs, dependencyDag, app.ProcessBlockConcurrent) case acltypes.ErrGovMsgInBlock: ctx.Logger().Info(fmt.Sprintf("Gov msg found while building DAG, processing synchronously: %s", err)) txResults = app.ProcessBlockSynchronous(ctx, txs) @@ -1204,7 +1214,7 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ } // Finalize all Bank Module Transfers here so that events are included - lazyWriteEvents := app.BankKeeper.WriteDeferredDepositsToModuleAccounts(ctx) + lazyWriteEvents := app.BankKeeper.WriteDeferredOperations(ctx) events = append(events, lazyWriteEvents...) ctx = app.enrichContextWithTxResults(ctx, txResults) diff --git a/app/app_test.go b/app/app_test.go index 706ba6868a..f8dbb845f5 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + sdk "github.com/cosmos/cosmos-sdk/types" sdkacltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol" acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types" "github.com/k0kubun/pp/v3" @@ -76,3 +77,86 @@ func TestGetChannelsFromSignalMapping(t *testing.T) { require.True(t, len(resultCompletionSignalsMap) > 1) require.True(t, len(resultBlockingSignalsMap) > 1) } + + +// Mock method to fail +func MockProcessBlockConcurrentFunctionFail( + ctx sdk.Context, + txs [][]byte, + completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping, + blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping, + txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping, +) ([]*abci.ExecTxResult, bool) { + return []*abci.ExecTxResult{}, false +} + +func MockProcessBlockConcurrentFunctionSuccess( + ctx sdk.Context, + txs [][]byte, + completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping, + blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping, + txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping, +) ([]*abci.ExecTxResult, bool) { + return []*abci.ExecTxResult{}, true +} + + +func TestProcessTxsSuccess(t *testing.T) { + tm := time.Now().UTC() + valPub := secp256k1.GenPrivKey().PubKey() + + + testWrapper := app.NewTestWrapper(t, tm, valPub) + dag := acltypes.NewDag() + + // Set some test context mem cache values + testWrapper.Ctx.ContextMemCache().UpsertDeferredSends("Some Account", sdk.NewCoins(sdk.Coin{ + Denom: "test", + Amount: sdk.NewInt(1), + })) + testWrapper.Ctx.ContextMemCache().UpsertDeferredWithdrawals("Some Other Account", sdk.NewCoins(sdk.Coin{ + Denom: "test", + Amount: sdk.NewInt(1), + })) + require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys())) + testWrapper.App.ProcessTxs( + testWrapper.Ctx, + [][]byte{}, + &dag, + MockProcessBlockConcurrentFunctionSuccess, + ) + + // It should be reset if it fails to prevent any values from being written + require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredWithdrawals().GetSortedKeys())) + require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys())) +} + + +func TestProcessTxsClearCacheOnFail(t *testing.T) { + tm := time.Now().UTC() + valPub := secp256k1.GenPrivKey().PubKey() + + + testWrapper := app.NewTestWrapper(t, tm, valPub) + dag := acltypes.NewDag() + + // Set some test context mem cache values + testWrapper.Ctx.ContextMemCache().UpsertDeferredSends("Some Account", sdk.NewCoins(sdk.Coin{ + Denom: "test", + Amount: sdk.NewInt(1), + })) + testWrapper.Ctx.ContextMemCache().UpsertDeferredWithdrawals("Some Account", sdk.NewCoins(sdk.Coin{ + Denom: "test", + Amount: sdk.NewInt(1), + })) + testWrapper.App.ProcessTxs( + testWrapper.Ctx, + [][]byte{}, + &dag, + MockProcessBlockConcurrentFunctionFail, + ) + + // It should be reset if it fails to prevent any values from being written + require.Equal(t, 0, len(testWrapper.Ctx.ContextMemCache().GetDeferredWithdrawals().GetSortedKeys())) + require.Equal(t, 0, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys())) +} diff --git a/go.mod b/go.mod index 1c9713bafa..71bb1c22fd 100644 --- a/go.mod +++ b/go.mod @@ -266,7 +266,7 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.318 + github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.319 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.91 diff --git a/go.sum b/go.sum index 8098aa6e46..b65ce8dcb4 100644 --- a/go.sum +++ b/go.sum @@ -1067,8 +1067,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod github.com/securego/gosec/v2 v2.11.0 h1:+PDkpzR41OI2jrw1q6AdXZCbsNGNGT7pQjal0H0cArI= github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+fscA+Pulbpo= github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= -github.com/sei-protocol/sei-cosmos v0.1.318 h1:hDTSYGuHdrN+nioJ6pWjL2u28BM/WVJC3MlBU/+vYAM= -github.com/sei-protocol/sei-cosmos v0.1.318/go.mod h1:N3OGX0WV/5fdWHZ5BavvrrRBr3SqUUwuwjvHnWinLss= +github.com/sei-protocol/sei-cosmos v0.1.319 h1:gynHm7kA52ElGNu5xYfC5j0OanhNTIp92KcMC7aGyqA= +github.com/sei-protocol/sei-cosmos v0.1.319/go.mod h1:N3OGX0WV/5fdWHZ5BavvrrRBr3SqUUwuwjvHnWinLss= github.com/sei-protocol/sei-tendermint v0.1.91 h1:FXIAAXZoVUOw0srE71giPy5Kt2lI54RjvwZi7pTF5V4= github.com/sei-protocol/sei-tendermint v0.1.91/go.mod h1:Olwbjyagrpoxj5DAUhHxMTWDVEfQ3FYdpypaJ3+6Hs8= github.com/sei-protocol/sei-tm-db v0.0.5 h1:3WONKdSXEqdZZeLuWYfK5hP37TJpfaUa13vAyAlvaQY= diff --git a/x/oracle/keeper/test_utils.go b/x/oracle/keeper/test_utils.go index ffaa5062c1..698e6d88a2 100644 --- a/x/oracle/keeper/test_utils.go +++ b/x/oracle/keeper/test_utils.go @@ -1,4 +1,4 @@ -// nolint +//nolint package keeper import ( diff --git a/x/oracle/simulation/operations.go b/x/oracle/simulation/operations.go index 2028e73625..3988c4b164 100644 --- a/x/oracle/simulation/operations.go +++ b/x/oracle/simulation/operations.go @@ -65,7 +65,7 @@ func WeightedOperations( } // SimulateMsgAggregateExchangeRateVote generates a MsgAggregateExchangeRateVote with random values. -// nolint: funlen +//nolint: funlen func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string, @@ -122,7 +122,7 @@ func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankK } // SimulateMsgDelegateFeedConsent generates a MsgDelegateFeedConsent with random values. -// nolint: funlen +//nolint: funlen func SimulateMsgDelegateFeedConsent(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,