diff --git a/app/app.go b/app/app.go index 3b6e2bbe2a..d1c212a335 100644 --- a/app/app.go +++ b/app/app.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" storetypes "github.com/cosmos/cosmos-sdk/store/types" @@ -30,6 +31,7 @@ import ( servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/simapp" sdk "github.com/cosmos/cosmos-sdk/types" + sdkacltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol" "github.com/cosmos/cosmos-sdk/types/module" "github.com/cosmos/cosmos-sdk/version" "github.com/cosmos/cosmos-sdk/x/auth" @@ -38,6 +40,7 @@ import ( aclmodule "github.com/cosmos/cosmos-sdk/x/accesscontrol" aclkeeper "github.com/cosmos/cosmos-sdk/x/accesscontrol/keeper" acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types" + authrest "github.com/cosmos/cosmos-sdk/x/auth/client/rest" authkeeper "github.com/cosmos/cosmos-sdk/x/auth/keeper" authsims "github.com/cosmos/cosmos-sdk/x/auth/simulation" @@ -881,11 +884,11 @@ func (app *App) BuildDependencyDag(ctx sdk.Context, txs [][]byte) (*Dag, error) return nil, err } msgs := tx.GetMsgs() - for _, msg := range msgs { + for messageIndex, msg := range msgs { msgDependencies := app.AccessControlKeeper.GetResourceDependencyMapping(ctx, acltypes.GenerateMessageKey(msg)) - for _, accessOp := range msgDependencies.AccessOps { + for _, accessOp := range msgDependencies.GetAccessOps() { // make a new node in the dependency dag - dependencyDag.AddNodeBuildDependency(txIndex, accessOp) + dependencyDag.AddNodeBuildDependency(messageIndex, txIndex, accessOp) } } @@ -926,6 +929,115 @@ func (app *App) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) return &resp, nil } +func (app *App) DeliverTxWithResult(ctx sdk.Context, tx []byte) *abci.ExecTxResult { + deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{ + Tx: tx, + }) + return &abci.ExecTxResult{ + Code: deliverTxResp.Code, + Data: deliverTxResp.Data, + Log: deliverTxResp.Log, + Info: deliverTxResp.Info, + GasWanted: deliverTxResp.GasWanted, + GasUsed: deliverTxResp.GasUsed, + Events: deliverTxResp.Events, + Codespace: deliverTxResp.Codespace, + } +} + +func (app *App) ProcessBlockSynchronous(ctx sdk.Context, txs [][]byte) []*abci.ExecTxResult { + txResults := []*abci.ExecTxResult{} + for _, tx := range txs { + txResults = append(txResults, app.DeliverTxWithResult(ctx, tx)) + } + return txResults +} + +// Returns a mapping of the accessOperation to the channels +func getChannelsFromSignalMapping(signalMapping MessageCompletionSignalMapping) sdkacltypes.MessageAccessOpsChannelMapping { + channelsMapping := make(sdkacltypes.MessageAccessOpsChannelMapping) + for messageIndex, accessOperationsToSignal := range signalMapping { + for accessOperation, completionSignals := range accessOperationsToSignal { + var channels []chan interface{} + for _, completionSignal := range completionSignals { + channels = append(channels, completionSignal.Channel) + } + channelsMapping[messageIndex][accessOperation] = channels + } + } + return channelsMapping +} + +type ChannelResult struct { + txIndex int + result *abci.ExecTxResult +} + +func (app *App) ProcessTxConcurrent( + ctx sdk.Context, + txIndex int, + txBytes []byte, + wg *sync.WaitGroup, + resultChan chan<- ChannelResult, + txCompletionSignalingMap MessageCompletionSignalMapping, + txBlockingSignalsMap MessageCompletionSignalMapping, +) { + defer wg.Done() + // Store the Channels in the Context Object for each transaction + ctx.WithTxBlockingChannels(getChannelsFromSignalMapping(txBlockingSignalsMap)) + ctx.WithTxCompletionChannels(getChannelsFromSignalMapping(txCompletionSignalingMap)) + + // Deliver the transaction and store the result in the channel + resultChan <- ChannelResult{txIndex, app.DeliverTxWithResult(ctx, txBytes)} +} + +func (app *App) ProcessBlockConcurrent( + ctx sdk.Context, + txs [][]byte, + completionSignalingMap map[int]MessageCompletionSignalMapping, + blockingSignalsMap map[int]MessageCompletionSignalMapping, +) []*abci.ExecTxResult { + var waitGroup sync.WaitGroup + resultChan := make(chan ChannelResult) + txResults := []*abci.ExecTxResult{} + + // If there's no transactions then return empty results + if len(txs) == 0 { + return txResults + } + + // For each transaction, start goroutine and deliver TX + for txIndex, txBytes := range txs { + waitGroup.Add(1) + go app.ProcessTxConcurrent( + ctx, + txIndex, + txBytes, + &waitGroup, + resultChan, + completionSignalingMap[txIndex], + blockingSignalsMap[txIndex], + ) + } + + // Waits for all the transactions to complete + waitGroup.Wait() + + // Gather Results and store it based on txIndex + // Concurrent results may be in different order than the original txIndex + txResultsMap := map[int]*abci.ExecTxResult{} + for result := range resultChan { + txResultsMap[result.txIndex] = result.result + } + + // Gather Results and store in array based on txIndex to preserve ordering + for txIndex := range txs { + txResults = append(txResults, txResultsMap[txIndex]) + } + + return txResults +} + func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequest, lastCommit abci.CommitInfo) ([]abci.Event, []*abci.ExecTxResult, abci.ResponseEndBlock, error) { goCtx := app.decorateContextWithDexMemState(ctx.Context()) ctx = ctx.WithContext(goCtx) @@ -967,30 +1079,18 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ // } // app.batchVerifier.VerifyTxs(ctx, typedTxs) - dag, err := app.BuildDependencyDag(ctx, txs) + dependencyDag, err := app.BuildDependencyDag(ctx, txs) txResults := []*abci.ExecTxResult{} + + // TODO:: add metrics for async vs sync if err != nil { - // something went wrong in dag, process txs sequentially - for _, tx := range txs { - // ctx = ctx.WithContext(context.WithValue(ctx.Context(), ante.ContextKeyTxIndexKey, i)) - deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{ - Tx: tx, - }) - txResults = append(txResults, &abci.ExecTxResult{ - Code: deliverTxResp.Code, - Data: deliverTxResp.Data, - Log: deliverTxResp.Log, - Info: deliverTxResp.Info, - GasWanted: deliverTxResp.GasWanted, - GasUsed: deliverTxResp.GasUsed, - Events: deliverTxResp.Events, - Codespace: deliverTxResp.Codespace, - }) + ctx.Logger().Error(fmt.Sprintf("Error while building DAG, processing synchronously: %s", err)) + if err == ErrCycleInDAG { + txResults = app.ProcessBlockSynchronous(ctx, txs) } } else { - // no error, lets process txs concurrently - _, _ = dag.BuildCompletionSignalMaps() - // TODO: create channel map here + completionSignalingMap, blockingSignalsMap := dependencyDag.BuildCompletionSignalMaps() + txResults = app.ProcessBlockConcurrent(ctx, txs, completionSignalingMap, blockingSignalsMap) } endBlockResp := app.EndBlock(ctx, abci.RequestEndBlock{ diff --git a/app/graph.go b/app/graph.go index 3a846cccc2..05c958257d 100644 --- a/app/graph.go +++ b/app/graph.go @@ -3,13 +3,14 @@ package app import ( "fmt" - acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types" + acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol" ) type DagNodeID int type DagNode struct { NodeID DagNodeID + MessageIndex int TxIndex int AccessOperation acltypes.AccessOperation } @@ -27,6 +28,17 @@ type Dag struct { NextID DagNodeID } +// Alias for mapping MessageIndexId -> AccessOperations -> CompletionSignals +type MessageCompletionSignalMapping = map[int]map[acltypes.AccessOperation][]CompletionSignal + +type CompletionSignal struct { + FromNodeID DagNodeID + ToNodeID DagNodeID + CompletionAccessOperation acltypes.AccessOperation // this is the access operation that must complete in order to send the signal + BlockedAccessOperation acltypes.AccessOperation // this is the access operation that is blocked by the completion access operation + Channel chan interface{} +} + func (dag *Dag) GetCompletionSignal(edge DagEdge) *CompletionSignal { // only if tx indexes are different fromNode := dag.NodeMap[edge.FromNodeID] @@ -39,6 +51,8 @@ func (dag *Dag) GetCompletionSignal(edge DagEdge) *CompletionSignal { ToNodeID: toNode.NodeID, CompletionAccessOperation: fromNode.AccessOperation, BlockedAccessOperation: toNode.AccessOperation, + // channel used for signalling + Channel: make(chan interface{}), } } @@ -68,9 +82,10 @@ func NewDag() Dag { } } -func (dag *Dag) AddNode(txIndex int, accessOp acltypes.AccessOperation) DagNode { +func (dag *Dag) AddNode(messageIndex int, txIndex int, accessOp acltypes.AccessOperation) DagNode { dagNode := DagNode{ NodeID: dag.NextID, + MessageIndex: messageIndex, TxIndex: txIndex, AccessOperation: accessOp, } @@ -101,8 +116,8 @@ func (dag *Dag) AddEdge(fromIndex DagNodeID, toIndex DagNodeID) *DagEdge { // that will allow the dependent goroutines to cordinate execution safely. // // It will also register the new node with AccessOpsMap so that future nodes that amy be dependent on this one can properly identify the dependency. -func (dag *Dag) AddNodeBuildDependency(txIndex int, accessOp acltypes.AccessOperation) { - dagNode := dag.AddNode(txIndex, accessOp) +func (dag *Dag) AddNodeBuildDependency(messageIndex int, txIndex int, accessOp acltypes.AccessOperation) { + dagNode := dag.AddNode(messageIndex, txIndex, accessOp) // if in TxIndexMap, make an edge from the previous node index if lastTxNodeID, ok := dag.TxIndexMap[txIndex]; ok { // TODO: we actually don't necessarily need these edges, but keeping for now so we can first determine that cycles can't be missed if we remove these @@ -186,7 +201,10 @@ func (dag *Dag) GetNodeDependencies(node DagNode) (nodeDependencies []DagNode) { } // returns completion signaling map and blocking signals map -func (dag *Dag) BuildCompletionSignalMaps() (completionSignalingMap map[int]map[acltypes.AccessOperation][]CompletionSignal, blockingSignalsMap map[int]map[acltypes.AccessOperation][]CompletionSignal) { +func (dag *Dag) BuildCompletionSignalMaps() ( + completionSignalingMap map[int]MessageCompletionSignalMapping, + blockingSignalsMap map[int]MessageCompletionSignalMapping, +) { // go through every node for _, node := range dag.NodeMap { // for each node, assign its completion signaling, and also assign blocking signals for the destination nodes @@ -195,11 +213,15 @@ func (dag *Dag) BuildCompletionSignalMaps() (completionSignalingMap map[int]map[ maybeCompletionSignal := dag.GetCompletionSignal(edge) if maybeCompletionSignal != nil { completionSignal := *maybeCompletionSignal + // add it to the right blocking signal in the right txindex toNode := dag.NodeMap[edge.ToNodeID] - blockingSignalsMap[toNode.TxIndex][completionSignal.BlockedAccessOperation] = append(blockingSignalsMap[toNode.TxIndex][completionSignal.BlockedAccessOperation], completionSignal) + prevBlockSignalMapping := blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] + blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] = append(prevBlockSignalMapping, completionSignal) + // add it to the completion signal for the tx index - completionSignalingMap[node.TxIndex][completionSignal.CompletionAccessOperation] = append(completionSignalingMap[node.TxIndex][completionSignal.CompletionAccessOperation], completionSignal) + prevCompletionSignalMapping := completionSignalingMap[node.TxIndex][node.MessageIndex][completionSignal.CompletionAccessOperation] + completionSignalingMap[node.TxIndex][node.MessageIndex][completionSignal.CompletionAccessOperation] = append(prevCompletionSignalMapping, completionSignal) } } @@ -208,11 +230,4 @@ func (dag *Dag) BuildCompletionSignalMaps() (completionSignalingMap map[int]map[ return } -type CompletionSignal struct { - FromNodeID DagNodeID - ToNodeID DagNodeID - CompletionAccessOperation acltypes.AccessOperation // this is the access operation that must complete in order to send the signal - BlockedAccessOperation acltypes.AccessOperation // this is the access operation that is blocked by the completion access operation -} - var ErrCycleInDAG = fmt.Errorf("cycle detected in DAG") diff --git a/app/graph_test.go b/app/graph_test.go index ebc6b24cb4..622705eab1 100644 --- a/app/graph_test.go +++ b/app/graph_test.go @@ -3,7 +3,7 @@ package app_test import ( "testing" - acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types" + acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol" "github.com/sei-protocol/sei-chain/app" "github.com/stretchr/testify/require" "github.com/yourbasic/graph" @@ -48,17 +48,17 @@ func TestCreateGraph(t *testing.T) { IdentifierTemplate: "ResourceB", } - dag.AddNodeBuildDependency(1, writeAccessA) // node id 0 - dag.AddNodeBuildDependency(1, readAccessB) // node id 1 - dag.AddNodeBuildDependency(1, commitAccessOp) // node id 2 - dag.AddNodeBuildDependency(2, readAccessA) // node id 3 - dag.AddNodeBuildDependency(2, readAccessB) // node id 4 - dag.AddNodeBuildDependency(2, commitAccessOp) // node id 5 - dag.AddNodeBuildDependency(3, readAccessB) // node id 6 - dag.AddNodeBuildDependency(3, readAccessA) // node id 7 - dag.AddNodeBuildDependency(3, commitAccessOp) // node id 8 - dag.AddNodeBuildDependency(4, writeAccessB) // node id 9 - dag.AddNodeBuildDependency(4, commitAccessOp) // node id 10 + dag.AddNodeBuildDependency(1, 1, writeAccessA) // node id 0 + dag.AddNodeBuildDependency(1, 1, readAccessB) // node id 1 + dag.AddNodeBuildDependency(1, 1, commitAccessOp) // node id 2 + dag.AddNodeBuildDependency(1, 2, readAccessA) // node id 3 + dag.AddNodeBuildDependency(1, 2, readAccessB) // node id 4 + dag.AddNodeBuildDependency(1, 2, commitAccessOp) // node id 5 + dag.AddNodeBuildDependency(1, 3, readAccessB) // node id 6 + dag.AddNodeBuildDependency(1, 3, readAccessA) // node id 7 + dag.AddNodeBuildDependency(1, 3, commitAccessOp) // node id 8 + dag.AddNodeBuildDependency(1, 4, writeAccessB) // node id 9 + dag.AddNodeBuildDependency(1, 4, commitAccessOp) // node id 10 require.Equal( t, diff --git a/go.mod b/go.mod index c107a78a31..07529b8eac 100644 --- a/go.mod +++ b/go.mod @@ -130,7 +130,7 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.64 + github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.69 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.54 diff --git a/go.sum b/go.sum index cc1038221a..8ac94499e1 100644 --- a/go.sum +++ b/go.sum @@ -1095,8 +1095,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+fscA+Pulbpo= github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= -github.com/sei-protocol/sei-cosmos v0.1.64 h1:smRXceJpavcGxG+KUoI5ztBdSvPKbebzTf06Bx0oAAM= -github.com/sei-protocol/sei-cosmos v0.1.64/go.mod h1:8C+uqD5ZyLjoZpQIKtQPpRynlD3fWgsrwFIt5NlKJIE= +github.com/sei-protocol/sei-cosmos v0.1.69 h1:jPFTf6vSg3DQ0FgKpQGw6ZEUuElKL2UJ3P8SBXbYnio= +github.com/sei-protocol/sei-cosmos v0.1.69/go.mod h1:Oaj7toqHCkwEEb+sDIWxtfTkPZxOpMXBXDMvIIqUjpw= github.com/sei-protocol/sei-tendermint v0.1.54 h1:T8snsZ5schPUSMa2STd61a0Mt/yoz1H5MTn3HRTbzj8= github.com/sei-protocol/sei-tendermint v0.1.54/go.mod h1:Olwbjyagrpoxj5DAUhHxMTWDVEfQ3FYdpypaJ3+6Hs8= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= diff --git a/x/oracle/simulation/operations.go b/x/oracle/simulation/operations.go index 6b3dc6c946..e7af90f2a9 100644 --- a/x/oracle/simulation/operations.go +++ b/x/oracle/simulation/operations.go @@ -83,7 +83,7 @@ func WeightedOperations( } // SimulateMsgAggregateExchangeRatePrevote generates a MsgAggregateExchangeRatePrevote with random values. -//nolint: funlen +// nolint: funlen func SimulateMsgAggregateExchangeRatePrevote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string, @@ -146,7 +146,7 @@ func SimulateMsgAggregateExchangeRatePrevote(ak types.AccountKeeper, bk types.Ba } // SimulateMsgAggregateExchangeRateVote generates a MsgAggregateExchangeRateVote with random values. -//nolint: funlen +// nolint: funlen func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string, @@ -213,7 +213,7 @@ func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankK } // SimulateMsgDelegateFeedConsent generates a MsgDelegateFeedConsent with random values. -//nolint: funlen +// nolint: funlen func SimulateMsgDelegateFeedConsent(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,