Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 48 additions & 38 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,14 @@ func (app *App) ProcessTxConcurrent(
metrics.IncrTxProcessTypeCounter(metrics.CONCURRENT)
}

type ProcessBlockConcurrentFunction func(
ctx sdk.Context,
txs [][]byte,
completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping,
blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping,
txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping,
) ([]*abci.ExecTxResult, bool)

func (app *App) ProcessBlockConcurrent(
ctx sdk.Context,
txs [][]byte,
Expand Down Expand Up @@ -1121,9 +1129,44 @@ func (app *App) ProcessBlockConcurrent(
return txResults, ok
}

func (app *App) ProcessTxs(
ctx sdk.Context,
txs [][]byte,
dependencyDag *acltypes.Dag,
processBlockConcurrentFunction ProcessBlockConcurrentFunction,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we pass the the function instead of directly calling the ProcessBlockConcurrent? is it primarily for the mocking purposes in testing? if so, isn't that an antipattern? (I don't have strong feelings one way or the other on whether we keep it this way, just looking to better understand this & golang best practices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is mainly for testing, I was following this: https://www.myhatchpad.com/insight/mocking-techniques-for-go/

) []*abci.ExecTxResult {
// Only run concurrently if no error
// Branch off the current context and pass a cached context to the concurrent delivered TXs that are shared.
// runTx will write to this ephermeral CacheMultiStore, after the process block is done, Write() is called on this
// CacheMultiStore where it writes the data to the parent store (DeliverState) in sorted Key order to maintain
// deterministic ordering between validators in the case of concurrent deliverTXs
processBlockCtx, processBlockCache := app.CacheContext(ctx)
concurrentResults, ok := processBlockConcurrentFunction(
processBlockCtx,
txs,
dependencyDag.CompletionSignalingMap,
dependencyDag.BlockingSignalsMap,
dependencyDag.TxMsgAccessOpMapping,
)
if ok {
// Write the results back to the concurrent contexts - if concurrent execution fails,
// this should not be called and the state is rolled back and retried with synchronous execution
processBlockCache.Write()
return concurrentResults
}

ctx.Logger().Error("Concurrent Execution failed, retrying with Synchronous")
// Clear the memcache context from the previous state as it failed, we no longer need to commit the data
ctx.ContextMemCache().Clear()

txResults := app.ProcessBlockSynchronous(ctx, txs)
processBlockCache.Write()
return txResults
}

func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequest, lastCommit abci.CommitInfo) ([]abci.Event, []*abci.ExecTxResult, abci.ResponseEndBlock, error) {
goCtx := app.decorateContextWithDexMemState(ctx.Context())
ctx = ctx.WithContext(goCtx).WithContextMemCache(sdk.NewContextMemCache())
ctx = ctx.WithContext(goCtx)

events := []abci.Event{}
beginBlockReq := abci.RequestBeginBlock{
Expand Down Expand Up @@ -1151,48 +1194,15 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ
beginBlockResp := app.BeginBlock(ctx, beginBlockReq)
events = append(events, beginBlockResp.Events...)

// typedTxs := []sdk.Tx{}
// for _, tx := range req.GetTxs() {
// typedTx, err := app.txDecoder(tx)
// if err != nil {
// typedTxs = append(typedTxs, nil)
// } else {
// typedTxs = append(typedTxs, typedTx)
// }
// }
// app.batchVerifier.VerifyTxs(ctx, typedTxs)

dependencyDag, err := app.AccessControlKeeper.BuildDependencyDag(ctx, app.txDecoder, app.GetAnteDepGenerator(), txs)

var txResults []*abci.ExecTxResult

switch err {
case nil:
// Only run concurrently if no error
// Branch off the current context and pass a cached context to the concurrent delivered TXs that are shared.
// runTx will write to this ephermeral CacheMultiStore, after the process block is done, Write() is called on this
// CacheMultiStore where it writes the data to the parent store (DeliverState) in sorted Key order to maintain
// deterministic ordering between validators in the case of concurrent deliverTXs
processBlockCtx, processBlockCache := app.CacheContext(ctx)
concurrentResults, ok := app.ProcessBlockConcurrent(
processBlockCtx,
txs,
dependencyDag.CompletionSignalingMap,
dependencyDag.BlockingSignalsMap,
dependencyDag.TxMsgAccessOpMapping,
)
if ok {
txResults = concurrentResults
// Write the results back to the concurrent contexts - if concurrent execution fails,
// this should not be called and the state is rolled back and retried with synchronous execution
processBlockCache.Write()
} else {
ctx.Logger().Error("Concurrent Execution failed, retrying with Synchronous")
txResults = app.ProcessBlockSynchronous(ctx, txs)
}

// Write the results back to the concurrent contexts
processBlockCache.Write()
// Start with a fresh state for the MemCache
ctx = ctx.WithContextMemCache(sdk.NewContextMemCache())
txResults = app.ProcessTxs(ctx, txs, dependencyDag, app.ProcessBlockConcurrent)
case acltypes.ErrGovMsgInBlock:
ctx.Logger().Info(fmt.Sprintf("Gov msg found while building DAG, processing synchronously: %s", err))
txResults = app.ProcessBlockSynchronous(ctx, txs)
Expand All @@ -1204,7 +1214,7 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ
}

// Finalize all Bank Module Transfers here so that events are included
lazyWriteEvents := app.BankKeeper.WriteDeferredDepositsToModuleAccounts(ctx)
lazyWriteEvents := app.BankKeeper.WriteDeferredOperations(ctx)
events = append(events, lazyWriteEvents...)

ctx = app.enrichContextWithTxResults(ctx, txResults)
Expand Down
84 changes: 84 additions & 0 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkacltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol"
acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types"
"github.com/k0kubun/pp/v3"
Expand Down Expand Up @@ -76,3 +77,86 @@ func TestGetChannelsFromSignalMapping(t *testing.T) {
require.True(t, len(resultCompletionSignalsMap) > 1)
require.True(t, len(resultBlockingSignalsMap) > 1)
}


// Mock method to fail
func MockProcessBlockConcurrentFunctionFail(
ctx sdk.Context,
txs [][]byte,
completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping,
blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping,
txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping,
) ([]*abci.ExecTxResult, bool) {
return []*abci.ExecTxResult{}, false
}

func MockProcessBlockConcurrentFunctionSuccess(
ctx sdk.Context,
txs [][]byte,
completionSignalingMap map[int]acltypes.MessageCompletionSignalMapping,
blockingSignalsMap map[int]acltypes.MessageCompletionSignalMapping,
txMsgAccessOpMapping map[int]acltypes.MsgIndexToAccessOpMapping,
) ([]*abci.ExecTxResult, bool) {
return []*abci.ExecTxResult{}, true
}


func TestProcessTxsSuccess(t *testing.T) {
tm := time.Now().UTC()
valPub := secp256k1.GenPrivKey().PubKey()


testWrapper := app.NewTestWrapper(t, tm, valPub)
dag := acltypes.NewDag()

// Set some test context mem cache values
testWrapper.Ctx.ContextMemCache().UpsertDeferredSends("Some Account", sdk.NewCoins(sdk.Coin{
Denom: "test",
Amount: sdk.NewInt(1),
}))
testWrapper.Ctx.ContextMemCache().UpsertDeferredWithdrawals("Some Other Account", sdk.NewCoins(sdk.Coin{
Denom: "test",
Amount: sdk.NewInt(1),
}))
require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys()))
testWrapper.App.ProcessTxs(
testWrapper.Ctx,
[][]byte{},
&dag,
MockProcessBlockConcurrentFunctionSuccess,
)

// It should be reset if it fails to prevent any values from being written
require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredWithdrawals().GetSortedKeys()))
require.Equal(t, 1, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys()))
}


func TestProcessTxsClearCacheOnFail(t *testing.T) {
tm := time.Now().UTC()
valPub := secp256k1.GenPrivKey().PubKey()


testWrapper := app.NewTestWrapper(t, tm, valPub)
dag := acltypes.NewDag()

// Set some test context mem cache values
testWrapper.Ctx.ContextMemCache().UpsertDeferredSends("Some Account", sdk.NewCoins(sdk.Coin{
Denom: "test",
Amount: sdk.NewInt(1),
}))
testWrapper.Ctx.ContextMemCache().UpsertDeferredWithdrawals("Some Account", sdk.NewCoins(sdk.Coin{
Denom: "test",
Amount: sdk.NewInt(1),
}))
testWrapper.App.ProcessTxs(
testWrapper.Ctx,
[][]byte{},
&dag,
MockProcessBlockConcurrentFunctionFail,
)

// It should be reset if it fails to prevent any values from being written
require.Equal(t, 0, len(testWrapper.Ctx.ContextMemCache().GetDeferredWithdrawals().GetSortedKeys()))
require.Equal(t, 0, len(testWrapper.Ctx.ContextMemCache().GetDeferredSends().GetSortedKeys()))
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ require (
)

replace (
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.318
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.319
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.91
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod
github.com/securego/gosec/v2 v2.11.0 h1:+PDkpzR41OI2jrw1q6AdXZCbsNGNGT7pQjal0H0cArI=
github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+fscA+Pulbpo=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/sei-protocol/sei-cosmos v0.1.318 h1:hDTSYGuHdrN+nioJ6pWjL2u28BM/WVJC3MlBU/+vYAM=
github.com/sei-protocol/sei-cosmos v0.1.318/go.mod h1:N3OGX0WV/5fdWHZ5BavvrrRBr3SqUUwuwjvHnWinLss=
github.com/sei-protocol/sei-cosmos v0.1.319 h1:gynHm7kA52ElGNu5xYfC5j0OanhNTIp92KcMC7aGyqA=
github.com/sei-protocol/sei-cosmos v0.1.319/go.mod h1:N3OGX0WV/5fdWHZ5BavvrrRBr3SqUUwuwjvHnWinLss=
github.com/sei-protocol/sei-tendermint v0.1.91 h1:FXIAAXZoVUOw0srE71giPy5Kt2lI54RjvwZi7pTF5V4=
github.com/sei-protocol/sei-tendermint v0.1.91/go.mod h1:Olwbjyagrpoxj5DAUhHxMTWDVEfQ3FYdpypaJ3+6Hs8=
github.com/sei-protocol/sei-tm-db v0.0.5 h1:3WONKdSXEqdZZeLuWYfK5hP37TJpfaUa13vAyAlvaQY=
Expand Down
2 changes: 1 addition & 1 deletion x/oracle/keeper/test_utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// nolint
//nolint
package keeper

import (
Expand Down
4 changes: 2 additions & 2 deletions x/oracle/simulation/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func WeightedOperations(
}

// SimulateMsgAggregateExchangeRateVote generates a MsgAggregateExchangeRateVote with random values.
// nolint: funlen
//nolint: funlen
func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation {
return func(
r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,
Expand Down Expand Up @@ -122,7 +122,7 @@ func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankK
}

// SimulateMsgDelegateFeedConsent generates a MsgDelegateFeedConsent with random values.
// nolint: funlen
//nolint: funlen
func SimulateMsgDelegateFeedConsent(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation {
return func(
r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,
Expand Down