From d1d4f7960a63d212f9887d4f9bfef1bfde8eebc8 Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Fri, 2 Dec 2022 01:03:59 -0500 Subject: [PATCH 1/4] test --- app/app.go | 85 +++++++++++++++++++++++++++---------------------- app/app_test.go | 83 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 38 deletions(-) diff --git a/app/app.go b/app/app.go index 1b7c7bdd83..fba5a862b9 100644 --- a/app/app.go +++ b/app/app.go @@ -1057,6 +1057,14 @@ func (app *App) ProcessTxConcurrent( metrics.IncrTxProcessTypeCounter(metrics.CONCURRENT) } +type ProcessBlockConcurrentFunction func( + ctx sdk.Context, + txs [][]byte, + completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping, + blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping, + txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping, +) ([]*abci.ExecTxResult, bool) + func (app *App) ProcessBlockConcurrent( ctx sdk.Context, txs [][]byte, @@ -1121,9 +1129,43 @@ func (app *App) ProcessBlockConcurrent( return txResults, ok } +func (app *App) ProcessTxs( + ctx sdk.Context, + txs [][]byte, + dependencyDag *acltypes.Dag, + processBlockConcurrentFunction ProcessBlockConcurrentFunction, +) []*abci.ExecTxResult { + // Only run concurrently if no error + // Branch off the current context and pass a cached context to the concurrent delivered TXs that are shared. + // runTx will write to this ephermeral CacheMultiStore, after the process block is done, Write() is called on this + // CacheMultiStore where it writes the data to the parent store (DeliverState) in sorted Key order to maintain + // deterministic ordering between validators in the case of concurrent deliverTXs + processBlockCtx, processBlockCache := app.CacheContext(ctx) + concurrentResults, ok := processBlockConcurrentFunction( + processBlockCtx, + txs, + dependencyDag.CompletionSignalingMap, + dependencyDag.BlockingSignalsMap, + dependencyDag.TxMsgAccessOpMapping, + ) + if ok { + // Write the results back to the concurrent contexts - if concurrent execution fails, + // this should not be called and the state is rolled back and retried with synchronous execution + processBlockCache.Write() + return concurrentResults + } + + ctx.Logger().Error("Concurrent Execution failed, retrying with Synchronous") + // Clear the memcache context from the previous state as it failed, we no longer need to commit the data + ctx = ctx.WithContextMemCache(sdk.NewContextMemCache()) + txResults := app.ProcessBlockSynchronous(ctx, txs) + processBlockCache.Write() + return txResults +} + func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequest, lastCommit abci.CommitInfo) ([]abci.Event, []*abci.ExecTxResult, abci.ResponseEndBlock, error) { goCtx := app.decorateContextWithDexMemState(ctx.Context()) - ctx = ctx.WithContext(goCtx).WithContextMemCache(sdk.NewContextMemCache()) + ctx = ctx.WithContext(goCtx) events := []abci.Event{} beginBlockReq := abci.RequestBeginBlock{ @@ -1151,48 +1193,15 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ beginBlockResp := app.BeginBlock(ctx, beginBlockReq) events = append(events, beginBlockResp.Events...) - // typedTxs := []sdk.Tx{} - // for _, tx := range req.GetTxs() { - // typedTx, err := app.txDecoder(tx) - // if err != nil { - // typedTxs = append(typedTxs, nil) - // } else { - // typedTxs = append(typedTxs, typedTx) - // } - // } - // app.batchVerifier.VerifyTxs(ctx, typedTxs) - dependencyDag, err := app.AccessControlKeeper.BuildDependencyDag(ctx, app.txDecoder, app.GetAnteDepGenerator(), txs) var txResults []*abci.ExecTxResult switch err { case nil: - // Only run concurrently if no error - // Branch off the current context and pass a cached context to the concurrent delivered TXs that are shared. - // runTx will write to this ephermeral CacheMultiStore, after the process block is done, Write() is called on this - // CacheMultiStore where it writes the data to the parent store (DeliverState) in sorted Key order to maintain - // deterministic ordering between validators in the case of concurrent deliverTXs - processBlockCtx, processBlockCache := app.CacheContext(ctx) - concurrentResults, ok := app.ProcessBlockConcurrent( - processBlockCtx, - txs, - dependencyDag.CompletionSignalingMap, - dependencyDag.BlockingSignalsMap, - dependencyDag.TxMsgAccessOpMapping, - ) - if ok { - txResults = concurrentResults - // Write the results back to the concurrent contexts - if concurrent execution fails, - // this should not be called and the state is rolled back and retried with synchronous execution - processBlockCache.Write() - } else { - ctx.Logger().Error("Concurrent Execution failed, retrying with Synchronous") - txResults = app.ProcessBlockSynchronous(ctx, txs) - } - - // Write the results back to the concurrent contexts - processBlockCache.Write() + // Start with a fresh state for the MemCache + ctx = ctx.WithContextMemCache(sdk.NewContextMemCache()) + txResults = app.ProcessTxs(ctx, txs, dependencyDag, app.ProcessBlockConcurrent) case acltypes.ErrGovMsgInBlock: ctx.Logger().Info(fmt.Sprintf("Gov msg found while building DAG, processing synchronously: %s", err)) txResults = app.ProcessBlockSynchronous(ctx, txs) @@ -1204,7 +1213,7 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ } // Finalize all Bank Module Transfers here so that events are included - lazyWriteEvents := app.BankKeeper.WriteDeferredDepositsToModuleAccounts(ctx) + lazyWriteEvents := app.BankKeeper.WriteDeferredOperations(ctx) events = append(events, lazyWriteEvents...) ctx = app.enrichContextWithTxResults(ctx, txResults) diff --git a/app/app_test.go b/app/app_test.go index 706ba6868a..4a05a900ab 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + sdk "github.com/cosmos/cosmos-sdk/types" sdkacltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol" acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types" "github.com/k0kubun/pp/v3" @@ -76,3 +77,85 @@ func TestGetChannelsFromSignalMapping(t *testing.T) { require.True(t, len(resultCompletionSignalsMap) > 1) require.True(t, len(resultBlockingSignalsMap) > 1) } + + +// Mock method to fail +func MockProcessBlockConcurrentFunctionFail( + ctx sdk.Context, + txs [][]byte, + completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping, + blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping, + txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping, +) ([]*abci.ExecTxResult, bool) { + return []*abci.ExecTxResult{}, false +} + +func MockProcessBlockConcurrentFunctionSuccess( + ctx sdk.Context, + txs [][]byte, + completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping, + blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping, + txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping, +) ([]*abci.ExecTxResult, bool) { + return []*abci.ExecTxResult{}, true +} + + +func TestProcessTxsSuccess(t *testing.T) { + tm := time.Now().UTC() + valPub := secp256k1.GenPrivKey().PubKey() + + + testWrapper := app.NewTestWrapper(t, tm, valPub) + dag := acltypes.NewDag() + + // Set some test context mem cache values + testWrapper.Ctx.ContextMemCache().UpsertDeferredSends("Some Account", sdk.NewCoins(sdk.Coin{ + Denom: "test", + Amount: sdk.NewInt(1), + })) + testWrapper.Ctx.ContextMemCache().UpsertDeferredWithdrawals("Some Account", sdk.NewCoins(sdk.Coin{ + Denom: "test", + Amount: sdk.NewInt(1), + })) + testWrapper.App.ProcessTxs( + testWrapper.Ctx, + [][]byte{}, + &dag, + MockProcessBlockConcurrentFunctionSuccess, + ) + + // It should be reset if it fails to prevent any values from being written + require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredWithdrawals().GetSortedKeys())) + require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys())) +} + + +func TestProcessTxsClearCacheOnFail(t *testing.T) { + tm := time.Now().UTC() + valPub := secp256k1.GenPrivKey().PubKey() + + + testWrapper := app.NewTestWrapper(t, tm, valPub) + dag := acltypes.NewDag() + + // Set some test context mem cache values + testWrapper.Ctx.ContextMemCache().UpsertDeferredSends("Some Account", sdk.NewCoins(sdk.Coin{ + Denom: "test", + Amount: sdk.NewInt(1), + })) + testWrapper.Ctx.ContextMemCache().UpsertDeferredWithdrawals("Some Account", sdk.NewCoins(sdk.Coin{ + Denom: "test", + Amount: sdk.NewInt(1), + })) + testWrapper.App.ProcessTxs( + testWrapper.Ctx, + [][]byte{}, + &dag, + MockProcessBlockConcurrentFunctionFail, + ) + + // It should be reset if it fails to prevent any values from being written + require.Equal(t, 0, len(testWrapper.Ctx.ContextMemCache().GetDeferredWithdrawals().GetSortedKeys())) + require.Equal(t, 0, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys())) +} From 88e2b8ff1df4ef778773aa3832598ce70ae323f6 Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Fri, 2 Dec 2022 11:54:08 -0500 Subject: [PATCH 2/4] lint --- app/app.go | 10 +++++----- x/oracle/keeper/test_utils.go | 2 +- x/oracle/simulation/operations.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/app/app.go b/app/app.go index fba5a862b9..0090380e00 100644 --- a/app/app.go +++ b/app/app.go @@ -1058,11 +1058,11 @@ func (app *App) ProcessTxConcurrent( } type ProcessBlockConcurrentFunction func( - ctx sdk.Context, - txs [][]byte, - completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping, - blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping, - txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping, + ctx sdk.Context, + txs [][]byte, + completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping, + blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping, + txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping, ) ([]*abci.ExecTxResult, bool) func (app *App) ProcessBlockConcurrent( diff --git a/x/oracle/keeper/test_utils.go b/x/oracle/keeper/test_utils.go index ffaa5062c1..698e6d88a2 100644 --- a/x/oracle/keeper/test_utils.go +++ b/x/oracle/keeper/test_utils.go @@ -1,4 +1,4 @@ -// nolint +//nolint package keeper import ( diff --git a/x/oracle/simulation/operations.go b/x/oracle/simulation/operations.go index 2028e73625..3988c4b164 100644 --- a/x/oracle/simulation/operations.go +++ b/x/oracle/simulation/operations.go @@ -65,7 +65,7 @@ func WeightedOperations( } // SimulateMsgAggregateExchangeRateVote generates a MsgAggregateExchangeRateVote with random values. -// nolint: funlen +//nolint: funlen func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string, @@ -122,7 +122,7 @@ func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankK } // SimulateMsgDelegateFeedConsent generates a MsgDelegateFeedConsent with random values. -// nolint: funlen +//nolint: funlen func SimulateMsgDelegateFeedConsent(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string, From 184a08acaf2c696b1a22cb1ab361f0cfdcf76680 Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Fri, 2 Dec 2022 12:29:40 -0500 Subject: [PATCH 3/4] Fix unit tests --- app/app.go | 3 ++- app/app_test.go | 4 +++- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/app/app.go b/app/app.go index 0090380e00..5fe4bd6075 100644 --- a/app/app.go +++ b/app/app.go @@ -1157,7 +1157,8 @@ func (app *App) ProcessTxs( ctx.Logger().Error("Concurrent Execution failed, retrying with Synchronous") // Clear the memcache context from the previous state as it failed, we no longer need to commit the data - ctx = ctx.WithContextMemCache(sdk.NewContextMemCache()) + ctx.ContextMemCache().Clear() + txResults := app.ProcessBlockSynchronous(ctx, txs) processBlockCache.Write() return txResults diff --git a/app/app_test.go b/app/app_test.go index 4a05a900ab..06d3bf97d2 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -114,10 +114,12 @@ func TestProcessTxsSuccess(t *testing.T) { Denom: "test", Amount: sdk.NewInt(1), })) - testWrapper.Ctx.ContextMemCache().UpsertDeferredWithdrawals("Some Account", sdk.NewCoins(sdk.Coin{ + testWrapper.Ctx.ContextMemCache().UpsertDeferredWithdrawals("Some Other Account", sdk.NewCoins(sdk.Coin{ Denom: "test", Amount: sdk.NewInt(1), })) + // It should be reset if it fails to prevent any values from being written + require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys())) testWrapper.App.ProcessTxs( testWrapper.Ctx, [][]byte{}, diff --git a/go.mod b/go.mod index 1c9713bafa..71bb1c22fd 100644 --- a/go.mod +++ b/go.mod @@ -266,7 +266,7 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.318 + github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.319 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.91 diff --git a/go.sum b/go.sum index 8098aa6e46..b65ce8dcb4 100644 --- a/go.sum +++ b/go.sum @@ -1067,8 +1067,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod github.com/securego/gosec/v2 v2.11.0 h1:+PDkpzR41OI2jrw1q6AdXZCbsNGNGT7pQjal0H0cArI= github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+fscA+Pulbpo= github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= -github.com/sei-protocol/sei-cosmos v0.1.318 h1:hDTSYGuHdrN+nioJ6pWjL2u28BM/WVJC3MlBU/+vYAM= -github.com/sei-protocol/sei-cosmos v0.1.318/go.mod h1:N3OGX0WV/5fdWHZ5BavvrrRBr3SqUUwuwjvHnWinLss= +github.com/sei-protocol/sei-cosmos v0.1.319 h1:gynHm7kA52ElGNu5xYfC5j0OanhNTIp92KcMC7aGyqA= +github.com/sei-protocol/sei-cosmos v0.1.319/go.mod h1:N3OGX0WV/5fdWHZ5BavvrrRBr3SqUUwuwjvHnWinLss= github.com/sei-protocol/sei-tendermint v0.1.91 h1:FXIAAXZoVUOw0srE71giPy5Kt2lI54RjvwZi7pTF5V4= github.com/sei-protocol/sei-tendermint v0.1.91/go.mod h1:Olwbjyagrpoxj5DAUhHxMTWDVEfQ3FYdpypaJ3+6Hs8= github.com/sei-protocol/sei-tm-db v0.0.5 h1:3WONKdSXEqdZZeLuWYfK5hP37TJpfaUa13vAyAlvaQY= From c7dbaee297be2ed47e0d5a49e6897e01cc11c77f Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Sat, 3 Dec 2022 17:07:00 -0500 Subject: [PATCH 4/4] Update app_test.go --- app/app_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/app/app_test.go b/app/app_test.go index 06d3bf97d2..f8dbb845f5 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -118,7 +118,6 @@ func TestProcessTxsSuccess(t *testing.T) { Denom: "test", Amount: sdk.NewInt(1), })) - // It should be reset if it fails to prevent any values from being written require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys())) testWrapper.App.ProcessTxs( testWrapper.Ctx,