diff --git a/app/app.go b/app/app.go index a66f9d353a..3b6e2bbe2a 100644 --- a/app/app.go +++ b/app/app.go @@ -12,6 +12,7 @@ import ( "time" storetypes "github.com/cosmos/cosmos-sdk/store/types" + graph "github.com/yourbasic/graph" appparams "github.com/sei-protocol/sei-chain/app/params" "github.com/sei-protocol/sei-chain/utils" @@ -316,7 +317,7 @@ type App struct { optimisticProcessingInfo *OptimisticProcessingInfo // batchVerifier *ante.SR25519BatchVerifier - // txDecoder sdk.TxDecoder + txDecoder sdk.TxDecoder } // New returns a reference to an initialized blockchain app @@ -375,7 +376,7 @@ func New( Tracer: &tr, TracerContext: context.Background(), }, - // txDecoder: encodingConfig.TxConfig.TxDecoder(), + txDecoder: encodingConfig.TxConfig.TxDecoder(), } app.ParamsKeeper = initParamsKeeper(appCodec, cdc, keys[paramstypes.StoreKey], tkeys[paramstypes.TStoreKey]) @@ -871,6 +872,31 @@ func (app *App) ProcessProposalHandler(ctx sdk.Context, req *abci.RequestProcess }, nil } +func (app *App) BuildDependencyDag(ctx sdk.Context, txs [][]byte) (*Dag, error) { + // contains the latest msg index for a specific Access Operation + dependencyDag := NewDag() + for txIndex, txBytes := range txs { + tx, err := app.txDecoder(txBytes) // TODO: results in repetitive decoding for txs with runtx decode (potential optimization) + if err != nil { + return nil, err + } + msgs := tx.GetMsgs() + for _, msg := range msgs { + msgDependencies := app.AccessControlKeeper.GetResourceDependencyMapping(ctx, acltypes.GenerateMessageKey(msg)) + for _, accessOp := range msgDependencies.AccessOps { + // make a new node in the dependency dag + dependencyDag.AddNodeBuildDependency(txIndex, accessOp) + } + } + + } + + if !graph.Acyclic(&dependencyDag) { + return nil, ErrCycleInDAG + } + return &dependencyDag, nil +} + func (app *App) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { startTime := time.Now() defer func() { @@ -941,23 +967,32 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ // } // app.batchVerifier.VerifyTxs(ctx, typedTxs) + dag, err := app.BuildDependencyDag(ctx, txs) txResults := []*abci.ExecTxResult{} - for _, tx := range txs { - // ctx = ctx.WithContext(context.WithValue(ctx.Context(), ante.ContextKeyTxIndexKey, i)) - deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{ - Tx: tx, - }) - txResults = append(txResults, &abci.ExecTxResult{ - Code: deliverTxResp.Code, - Data: deliverTxResp.Data, - Log: deliverTxResp.Log, - Info: deliverTxResp.Info, - GasWanted: deliverTxResp.GasWanted, - GasUsed: deliverTxResp.GasUsed, - Events: deliverTxResp.Events, - Codespace: deliverTxResp.Codespace, - }) + if err != nil { + // something went wrong in dag, process txs sequentially + for _, tx := range txs { + // ctx = ctx.WithContext(context.WithValue(ctx.Context(), ante.ContextKeyTxIndexKey, i)) + deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{ + Tx: tx, + }) + txResults = append(txResults, &abci.ExecTxResult{ + Code: deliverTxResp.Code, + Data: deliverTxResp.Data, + Log: deliverTxResp.Log, + Info: deliverTxResp.Info, + GasWanted: deliverTxResp.GasWanted, + GasUsed: deliverTxResp.GasUsed, + Events: deliverTxResp.Events, + Codespace: deliverTxResp.Codespace, + }) + } + } else { + // no error, lets process txs concurrently + _, _ = dag.BuildCompletionSignalMaps() + // TODO: create channel map here } + endBlockResp := app.EndBlock(ctx, abci.RequestEndBlock{ Height: req.GetHeight(), }) diff --git a/app/graph.go b/app/graph.go new file mode 100644 index 0000000000..3a846cccc2 --- /dev/null +++ b/app/graph.go @@ -0,0 +1,218 @@ +package app + +import ( + "fmt" + + acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types" +) + +type DagNodeID int + +type DagNode struct { + NodeID DagNodeID + TxIndex int + AccessOperation acltypes.AccessOperation +} + +type DagEdge struct { + FromNodeID DagNodeID + ToNodeID DagNodeID +} + +type Dag struct { + NodeMap map[DagNodeID]DagNode + EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info + AccessOpsMap map[acltypes.AccessOperation][]DagNodeID // tracks latest node to use a specific access op + TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index + NextID DagNodeID +} + +func (dag *Dag) GetCompletionSignal(edge DagEdge) *CompletionSignal { + // only if tx indexes are different + fromNode := dag.NodeMap[edge.FromNodeID] + toNode := dag.NodeMap[edge.ToNodeID] + if fromNode.TxIndex == toNode.TxIndex { + return nil + } + return &CompletionSignal{ + FromNodeID: fromNode.NodeID, + ToNodeID: toNode.NodeID, + CompletionAccessOperation: fromNode.AccessOperation, + BlockedAccessOperation: toNode.AccessOperation, + } +} + +// Order returns the number of vertices in a graph. +func (dag Dag) Order() int { + return len(dag.NodeMap) +} + +// Visit calls the do function for each neighbor w of vertex v, used by the graph acyclic validator +func (dag Dag) Visit(v int, do func(w int, c int64) (skip bool)) (aborted bool) { + for _, edge := range dag.EdgesMap[DagNodeID(v)] { + // just have cost as zero because we only need for acyclic validation purposes + if do(int(edge.ToNodeID), 0) { + return true + } + } + return false +} + +func NewDag() Dag { + return Dag{ + NodeMap: make(map[DagNodeID]DagNode), + EdgesMap: make(map[DagNodeID][]DagEdge), + AccessOpsMap: make(map[acltypes.AccessOperation][]DagNodeID), + TxIndexMap: make(map[int]DagNodeID), + NextID: 0, + } +} + +func (dag *Dag) AddNode(txIndex int, accessOp acltypes.AccessOperation) DagNode { + dagNode := DagNode{ + NodeID: dag.NextID, + TxIndex: txIndex, + AccessOperation: accessOp, + } + dag.NodeMap[dag.NextID] = dagNode + dag.NextID++ + return dagNode +} + +func (dag *Dag) AddEdge(fromIndex DagNodeID, toIndex DagNodeID) *DagEdge { + // no-ops if the from or to node doesn't exist + if _, ok := dag.NodeMap[fromIndex]; !ok { + return nil + } + if _, ok := dag.NodeMap[toIndex]; !ok { + return nil + } + newEdge := DagEdge{fromIndex, toIndex} + dag.EdgesMap[fromIndex] = append(dag.EdgesMap[fromIndex], newEdge) + return &newEdge +} + +// This function is a helper used to build the dependency graph one access operation at a time. +// It will first add a node corresponding to the tx index and access operation (linking it to the previous most recent node for that tx if applicable) +// and then will build edges from any access operations on which the new node is dependent. +// +// This will be accomplished using the AccessOpsMap in dag which keeps track of which nodes access which resources. +// It will then create an edge between the relevant node upon which it is dependent, and this edge can later be used to build the completion signals +// that will allow the dependent goroutines to cordinate execution safely. +// +// It will also register the new node with AccessOpsMap so that future nodes that amy be dependent on this one can properly identify the dependency. +func (dag *Dag) AddNodeBuildDependency(txIndex int, accessOp acltypes.AccessOperation) { + dagNode := dag.AddNode(txIndex, accessOp) + // if in TxIndexMap, make an edge from the previous node index + if lastTxNodeID, ok := dag.TxIndexMap[txIndex]; ok { + // TODO: we actually don't necessarily need these edges, but keeping for now so we can first determine that cycles can't be missed if we remove these + // add an edge between access ops in a transaction + dag.AddEdge(lastTxNodeID, dagNode.NodeID) + } + // update tx index map + dag.TxIndexMap[txIndex] = dagNode.NodeID + + nodeDependencies := dag.GetNodeDependencies(dagNode) + // build edges for each of the dependencies + for _, nodeDependency := range nodeDependencies { + dag.AddEdge(nodeDependency.NodeID, dagNode.NodeID) + } + + // update access ops map with the latest node id using a specific access op + dag.AccessOpsMap[accessOp] = append(dag.AccessOpsMap[accessOp], dagNode.NodeID) +} + +// This helper will identify nodes that are dependencies for the current node, and can then be used for creating edges between then for future completion signals +func (dag *Dag) GetNodeDependencies(node DagNode) (nodeDependencies []DagNode) { + accessOp := node.AccessOperation + // if the blocking access ops are in access ops map, make an edge + switch accessOp.AccessType { + case acltypes.AccessType_READ: + // if we need to do a read, we need latest write as a dependency + // TODO: replace hardcoded access op dependencies with helper that generates (and also generates superseding resources too eg. Resource.ALL is blocking for Resource.KV) + writeAccessOp := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_WRITE, + ResourceType: accessOp.GetResourceType(), + IdentifierTemplate: accessOp.GetIdentifierTemplate(), + } + if writeNodeIDs, ok := dag.AccessOpsMap[writeAccessOp]; ok { + for _, wn := range writeNodeIDs { + writeNode := dag.NodeMap[wn] + // if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two) + // if from a previous transaction, we need to create an edge + if writeNode.TxIndex < node.TxIndex { + // this should be the COMMIT access op for the tx + lastTxNode := dag.NodeMap[dag.TxIndexMap[writeNode.TxIndex]] + nodeDependencies = append(nodeDependencies, lastTxNode) + } + } + } + case acltypes.AccessType_WRITE, acltypes.AccessType_UNKNOWN: + // if we need to do a write, we need read and write as dependencies + writeAccessOp := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_WRITE, + ResourceType: accessOp.GetResourceType(), + IdentifierTemplate: accessOp.GetIdentifierTemplate(), + } + if writeNodeIDs, ok := dag.AccessOpsMap[writeAccessOp]; ok { + for _, wn := range writeNodeIDs { + // if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two) + writeNode := dag.NodeMap[wn] + // if from a previous transaction, we need to create an edge + if writeNode.TxIndex < node.TxIndex { + // we need to get the last node from that tx + lastTxNode := dag.NodeMap[dag.TxIndexMap[writeNode.TxIndex]] + nodeDependencies = append(nodeDependencies, lastTxNode) + } + } + } + readAccessOp := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_READ, + ResourceType: accessOp.GetResourceType(), + IdentifierTemplate: accessOp.GetIdentifierTemplate(), + } + if readNodeIDs, ok := dag.AccessOpsMap[readAccessOp]; ok { + for _, rn := range readNodeIDs { + readNode := dag.NodeMap[rn] + // if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two) + // if from a previous transaction, we need to create an edge + if readNode.TxIndex < node.TxIndex { + nodeDependencies = append(nodeDependencies, readNode) + } + } + } + } + return nodeDependencies +} + +// returns completion signaling map and blocking signals map +func (dag *Dag) BuildCompletionSignalMaps() (completionSignalingMap map[int]map[acltypes.AccessOperation][]CompletionSignal, blockingSignalsMap map[int]map[acltypes.AccessOperation][]CompletionSignal) { + // go through every node + for _, node := range dag.NodeMap { + // for each node, assign its completion signaling, and also assign blocking signals for the destination nodes + if outgoingEdges, ok := dag.EdgesMap[node.NodeID]; ok { + for _, edge := range outgoingEdges { + maybeCompletionSignal := dag.GetCompletionSignal(edge) + if maybeCompletionSignal != nil { + completionSignal := *maybeCompletionSignal + // add it to the right blocking signal in the right txindex + toNode := dag.NodeMap[edge.ToNodeID] + blockingSignalsMap[toNode.TxIndex][completionSignal.BlockedAccessOperation] = append(blockingSignalsMap[toNode.TxIndex][completionSignal.BlockedAccessOperation], completionSignal) + // add it to the completion signal for the tx index + completionSignalingMap[node.TxIndex][completionSignal.CompletionAccessOperation] = append(completionSignalingMap[node.TxIndex][completionSignal.CompletionAccessOperation], completionSignal) + } + + } + } + } + return +} + +type CompletionSignal struct { + FromNodeID DagNodeID + ToNodeID DagNodeID + CompletionAccessOperation acltypes.AccessOperation // this is the access operation that must complete in order to send the signal + BlockedAccessOperation acltypes.AccessOperation // this is the access operation that is blocked by the completion access operation +} + +var ErrCycleInDAG = fmt.Errorf("cycle detected in DAG") diff --git a/app/graph_test.go b/app/graph_test.go new file mode 100644 index 0000000000..ebc6b24cb4 --- /dev/null +++ b/app/graph_test.go @@ -0,0 +1,110 @@ +package app_test + +import ( + "testing" + + acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types" + "github.com/sei-protocol/sei-chain/app" + "github.com/stretchr/testify/require" + "github.com/yourbasic/graph" +) + +func TestCreateGraph(t *testing.T) { + dag := app.NewDag() + /** + tx1: write to A, read B, commit 1 + tx2: read A, read B, commit 2 + tx3: read A, read B, commit 3 + tx4: write B, commit 4 + expected dag + 1wA -> 1rB -> 1c =>v 2rA -> 2rB ----=\---> 2c + 3rB -------------> 3rA -> 3c V + \-----------------------------------=> 4wB -> 4c + **/ + + commitAccessOp := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_COMMIT, + ResourceType: acltypes.ResourceType_ANY, + IdentifierTemplate: "*", + } + writeAccessA := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_WRITE, + ResourceType: acltypes.ResourceType_KV, + IdentifierTemplate: "ResourceA", + } + readAccessA := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_READ, + ResourceType: acltypes.ResourceType_KV, + IdentifierTemplate: "ResourceA", + } + writeAccessB := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_WRITE, + ResourceType: acltypes.ResourceType_KV, + IdentifierTemplate: "ResourceB", + } + readAccessB := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_READ, + ResourceType: acltypes.ResourceType_KV, + IdentifierTemplate: "ResourceB", + } + + dag.AddNodeBuildDependency(1, writeAccessA) // node id 0 + dag.AddNodeBuildDependency(1, readAccessB) // node id 1 + dag.AddNodeBuildDependency(1, commitAccessOp) // node id 2 + dag.AddNodeBuildDependency(2, readAccessA) // node id 3 + dag.AddNodeBuildDependency(2, readAccessB) // node id 4 + dag.AddNodeBuildDependency(2, commitAccessOp) // node id 5 + dag.AddNodeBuildDependency(3, readAccessB) // node id 6 + dag.AddNodeBuildDependency(3, readAccessA) // node id 7 + dag.AddNodeBuildDependency(3, commitAccessOp) // node id 8 + dag.AddNodeBuildDependency(4, writeAccessB) // node id 9 + dag.AddNodeBuildDependency(4, commitAccessOp) // node id 10 + + require.Equal( + t, + []app.DagEdge{{0, 1}}, + dag.EdgesMap[0], + ) + require.Equal( + t, + []app.DagEdge{{1, 2}, {1, 9}}, + dag.EdgesMap[1], + ) + require.Equal( + t, + []app.DagEdge{{2, 3}, {2, 7}}, + dag.EdgesMap[2], + ) + require.Equal( + t, + []app.DagEdge{{3, 4}}, + dag.EdgesMap[3], + ) + require.Equal( + t, + []app.DagEdge{{4, 5}, {4, 9}}, + dag.EdgesMap[4], + ) + require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[5]) + require.Equal( + t, + []app.DagEdge{{6, 7}, {6, 9}}, + dag.EdgesMap[6], + ) + require.Equal( + t, + []app.DagEdge{{7, 8}}, + dag.EdgesMap[7], + ) + require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[8]) + require.Equal( + t, + []app.DagEdge{{9, 10}}, + dag.EdgesMap[9], + ) + require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[10]) + + // assert dag is acyclic + acyclic := graph.Acyclic(dag) + require.True(t, acyclic) +} diff --git a/go.mod b/go.mod index 980c742f99..fff4ed7b33 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tendermint/tendermint v0.37.0-dev github.com/tendermint/tm-db v0.6.8-0.20220519162814-e24b96538a12 + github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 go.opentelemetry.io/otel v1.9.0 go.opentelemetry.io/otel/exporters/jaeger v1.9.0 go.opentelemetry.io/otel/sdk v1.9.0 @@ -129,7 +130,7 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.66 + github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.67 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.13 diff --git a/go.sum b/go.sum index deff6c322a..200fd7cd3f 100644 --- a/go.sum +++ b/go.sum @@ -1095,8 +1095,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+fscA+Pulbpo= github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= -github.com/sei-protocol/sei-cosmos v0.1.66 h1:ow4z4wKm1iN1GoZAaz+kCB6nv+BKSLnhpFvyale785M= -github.com/sei-protocol/sei-cosmos v0.1.66/go.mod h1:3uCdb2FCio9uVQRPdZxZ1GqHqyb1moYH93xtTMa7DL8= +github.com/sei-protocol/sei-cosmos v0.1.67 h1:AdjpHcaryUaGI4X9nnK13YWxPOguDveH2sH+06gBsCw= +github.com/sei-protocol/sei-cosmos v0.1.67/go.mod h1:Oaj7toqHCkwEEb+sDIWxtfTkPZxOpMXBXDMvIIqUjpw= github.com/sei-protocol/sei-tendermint v0.1.13 h1:uaMXhi+zpaqcUDlshxjqmPmUI/zSZla2a2ZmOtDK5RM= github.com/sei-protocol/sei-tendermint v0.1.13/go.mod h1:Olwbjyagrpoxj5DAUhHxMTWDVEfQ3FYdpypaJ3+6Hs8= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= @@ -1248,6 +1248,8 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1: github.com/yagipy/maintidx v1.0.0/go.mod h1:0qNf/I/CCZXSMhsRsrEPDZ+DkekpKLXAJfsTACwgXLk= github.com/ybbus/jsonrpc v2.1.2+incompatible/go.mod h1:XJrh1eMSzdIYFbM08flv0wp5G35eRniyeGut1z+LSiE= github.com/yeya24/promlinter v0.2.0/go.mod h1:u54lkmBOZrpEbQQ6gox2zWKKLKu2SGe+2KOiextY+IA= +github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 h1:7v7L5lsfw4w8iqBBXETukHo4IPltmD+mWoLRYUmeGN8= +github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869/go.mod h1:Rfzr+sqaDreiCaoQbFCu3sTXxeFq/9kXRuyOoSlGQHE= github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=