diff --git a/app/graph.go b/app/graph.go index a8e7dd69f5..8301de3094 100644 --- a/app/graph.go +++ b/app/graph.go @@ -4,10 +4,19 @@ import ( "fmt" acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol" + mapset "github.com/deckarep/golang-set" ) type DagNodeID int +// Alias for mapping resource identifier to dag node IDs +type ResourceIdentifierNodeIDMapping = map[string][]DagNodeID + +type ResourceAccess struct { + ResourceType acltypes.ResourceType + AccessType acltypes.AccessType +} + type DagNode struct { NodeID DagNodeID MessageIndex int @@ -21,11 +30,11 @@ type DagEdge struct { } type Dag struct { - NodeMap map[DagNodeID]DagNode - EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info - AccessOpsMap map[acltypes.AccessOperation][]DagNodeID // tracks latest node to use a specific access op - TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index - NextID DagNodeID + NodeMap map[DagNodeID]DagNode + EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info + ResourceAccessMap map[ResourceAccess]ResourceIdentifierNodeIDMapping // maps resource type and access type to identifiers + node IDs + TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index + NextID DagNodeID } // Alias for mapping MessageIndexId -> AccessOperations -> CompletionSignals @@ -74,11 +83,18 @@ func (dag Dag) Visit(v int, do func(w int, c int64) (skip bool)) (aborted bool) func NewDag() Dag { return Dag{ - NodeMap: make(map[DagNodeID]DagNode), - EdgesMap: make(map[DagNodeID][]DagEdge), - AccessOpsMap: make(map[acltypes.AccessOperation][]DagNodeID), - TxIndexMap: make(map[int]DagNodeID), - NextID: 0, + NodeMap: make(map[DagNodeID]DagNode), + EdgesMap: make(map[DagNodeID][]DagEdge), + ResourceAccessMap: make(map[ResourceAccess]ResourceIdentifierNodeIDMapping), + TxIndexMap: make(map[int]DagNodeID), + NextID: 0, + } +} + +func GetResourceAccess(accessOp acltypes.AccessOperation) ResourceAccess { + return ResourceAccess{ + accessOp.ResourceType, + accessOp.AccessType, } } @@ -124,73 +140,141 @@ func (dag *Dag) AddNodeBuildDependency(messageIndex int, txIndex int, accessOp a nodeDependencies := dag.GetNodeDependencies(dagNode) // build edges for each of the dependencies for _, nodeDependency := range nodeDependencies { - dag.AddEdge(nodeDependency.NodeID, dagNode.NodeID) + dag.AddEdge(nodeDependency, dagNode.NodeID) } // update access ops map with the latest node id using a specific access op - dag.AccessOpsMap[accessOp] = append(dag.AccessOpsMap[accessOp], dagNode.NodeID) + resourceAccess := GetResourceAccess(accessOp) + if _, exists := dag.ResourceAccessMap[resourceAccess]; !exists { + dag.ResourceAccessMap[resourceAccess] = make(ResourceIdentifierNodeIDMapping) + } + dag.ResourceAccessMap[resourceAccess][accessOp.IdentifierTemplate] = append(dag.ResourceAccessMap[resourceAccess][accessOp.IdentifierTemplate], dagNode.NodeID) } -// This helper will identify nodes that are dependencies for the current node, and can then be used for creating edges between then for future completion signals -func (dag *Dag) GetNodeDependencies(node DagNode) (nodeDependencies []DagNode) { - accessOp := node.AccessOperation - // if the blocking access ops are in access ops map, make an edge - switch accessOp.AccessType { - case acltypes.AccessType_READ: - // if we need to do a read, we need latest write as a dependency - // TODO: replace hardcoded access op dependencies with helper that generates (and also generates superseding resources too eg. Resource.ALL is blocking for Resource.KV) - writeAccessOp := acltypes.AccessOperation{ - AccessType: acltypes.AccessType_WRITE, - ResourceType: accessOp.GetResourceType(), - IdentifierTemplate: accessOp.GetIdentifierTemplate(), +func getAllNodeIDsFromIdentifierMapping(mapping ResourceIdentifierNodeIDMapping) (allNodeIDs []DagNodeID) { + for _, nodeIDs := range mapping { + allNodeIDs = append(allNodeIDs, nodeIDs...) + } + return +} + +func (dag *Dag) getDependencyWrites(node DagNode, dependentResource acltypes.ResourceType) mapset.Set { + nodeIDs := mapset.NewSet() + writeResourceAccess := ResourceAccess{ + dependentResource, + acltypes.AccessType_WRITE, + } + if identifierNodeMapping, ok := dag.ResourceAccessMap[writeResourceAccess]; ok { + var nodeIDsMaybeDependency []DagNodeID + if dependentResource != node.AccessOperation.ResourceType { + // we can add all node IDs as dependencies if applicable + nodeIDsMaybeDependency = getAllNodeIDsFromIdentifierMapping(identifierNodeMapping) + } else { + // TODO: otherwise we need to have partial filtering on identifiers + // for now, lets just perform exact matching on identifiers + nodeIDsMaybeDependency = identifierNodeMapping[node.AccessOperation.IdentifierTemplate] } - if writeNodeIDs, ok := dag.AccessOpsMap[writeAccessOp]; ok { - for _, wn := range writeNodeIDs { - writeNode := dag.NodeMap[wn] - // if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two) - // if from a previous transaction, we need to create an edge - if writeNode.TxIndex < node.TxIndex { - // this should be the COMMIT access op for the tx - lastTxNode := dag.NodeMap[dag.TxIndexMap[writeNode.TxIndex]] - nodeDependencies = append(nodeDependencies, lastTxNode) - } + for _, wn := range nodeIDsMaybeDependency { + writeNode := dag.NodeMap[wn] + // if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two) + // if from a previous transaction, we need to create an edge + if writeNode.TxIndex < node.TxIndex { + // this should be the COMMIT access op for the tx + lastTxNode := dag.NodeMap[dag.TxIndexMap[writeNode.TxIndex]] + nodeIDs.Add(lastTxNode.NodeID) } } - case acltypes.AccessType_WRITE, acltypes.AccessType_UNKNOWN: - // if we need to do a write, we need read and write as dependencies - writeAccessOp := acltypes.AccessOperation{ - AccessType: acltypes.AccessType_WRITE, - ResourceType: accessOp.GetResourceType(), - IdentifierTemplate: accessOp.GetIdentifierTemplate(), + } + return nodeIDs +} + +func (dag *Dag) getDependencyUnknowns(node DagNode, dependentResource acltypes.ResourceType) mapset.Set { + nodeIDs := mapset.NewSet() + unknownResourceAccess := ResourceAccess{ + dependentResource, + acltypes.AccessType_UNKNOWN, + } + if identifierNodeMapping, ok := dag.ResourceAccessMap[unknownResourceAccess]; ok { + var nodeIDsMaybeDependency []DagNodeID + if dependentResource != node.AccessOperation.ResourceType { + // we can add all node IDs as dependencies if applicable + nodeIDsMaybeDependency = getAllNodeIDsFromIdentifierMapping(identifierNodeMapping) + } else { + // TODO: otherwise we need to have partial filtering on identifiers + // for now, lets just perform exact matching on identifiers + nodeIDsMaybeDependency = identifierNodeMapping[node.AccessOperation.IdentifierTemplate] } - if writeNodeIDs, ok := dag.AccessOpsMap[writeAccessOp]; ok { - for _, wn := range writeNodeIDs { - // if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two) - writeNode := dag.NodeMap[wn] - // if from a previous transaction, we need to create an edge - if writeNode.TxIndex < node.TxIndex { - // we need to get the last node from that tx - lastTxNode := dag.NodeMap[dag.TxIndexMap[writeNode.TxIndex]] - nodeDependencies = append(nodeDependencies, lastTxNode) - } + for _, un := range nodeIDsMaybeDependency { + uNode := dag.NodeMap[un] + // if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two) + // if from a previous transaction, we need to create an edge + if uNode.TxIndex < node.TxIndex { + // this should be the COMMIT access op for the tx + lastTxNode := dag.NodeMap[dag.TxIndexMap[uNode.TxIndex]] + nodeIDs.Add(lastTxNode.NodeID) } } - readAccessOp := acltypes.AccessOperation{ - AccessType: acltypes.AccessType_READ, - ResourceType: accessOp.GetResourceType(), - IdentifierTemplate: accessOp.GetIdentifierTemplate(), + } + return nodeIDs +} + +func (dag *Dag) getDependencyReads(node DagNode, dependentResource acltypes.ResourceType) mapset.Set { + nodeIDs := mapset.NewSet() + readResourceAccess := ResourceAccess{ + dependentResource, + acltypes.AccessType_READ, + } + if identifierNodeMapping, ok := dag.ResourceAccessMap[readResourceAccess]; ok { + var nodeIDsMaybeDependency []DagNodeID + if dependentResource != node.AccessOperation.ResourceType { + // we can add all node IDs as dependencies if applicable + nodeIDsMaybeDependency = getAllNodeIDsFromIdentifierMapping(identifierNodeMapping) + } else { + // TODO: otherwise we need to have partial filtering on identifiers + // for now, lets just perform exact matching on identifiers + nodeIDsMaybeDependency = identifierNodeMapping[node.AccessOperation.IdentifierTemplate] } - if readNodeIDs, ok := dag.AccessOpsMap[readAccessOp]; ok { - for _, rn := range readNodeIDs { - readNode := dag.NodeMap[rn] - // if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two) - // if from a previous transaction, we need to create an edge - if readNode.TxIndex < node.TxIndex { - nodeDependencies = append(nodeDependencies, readNode) - } + for _, rn := range nodeIDsMaybeDependency { + readNode := dag.NodeMap[rn] + // if from a previous transaction, we need to create an edge + if readNode.TxIndex < node.TxIndex { + nodeIDs.Add(readNode.NodeID) } } } + return nodeIDs +} + +// given a node, and a dependent Resource, generate a set of nodes that are dependencies +func (dag *Dag) getNodeDependenciesForResource(node DagNode, dependentResource acltypes.ResourceType) mapset.Set { + nodeIDs := mapset.NewSet() + switch node.AccessOperation.AccessType { + case acltypes.AccessType_READ: + // for a read, we are blocked on prior writes and unknown + nodeIDs = nodeIDs.Union(dag.getDependencyWrites(node, dependentResource)) + nodeIDs = nodeIDs.Union(dag.getDependencyUnknowns(node, dependentResource)) + case acltypes.AccessType_WRITE, acltypes.AccessType_UNKNOWN: + // for write / unknown, we're blocked on prior writes, reads, and unknowns + nodeIDs = nodeIDs.Union(dag.getDependencyWrites(node, dependentResource)) + nodeIDs = nodeIDs.Union(dag.getDependencyUnknowns(node, dependentResource)) + nodeIDs = nodeIDs.Union(dag.getDependencyReads(node, dependentResource)) + } + return nodeIDs +} + +// This helper will identify nodes that are dependencies for the current node, and can then be used for creating edges between then for future completion signals +func (dag *Dag) GetNodeDependencies(node DagNode) []DagNodeID { + accessOp := node.AccessOperation + // get all parent resource types, we'll need to create edges for any of these + parentResources := accessOp.ResourceType.GetResourceDependencies() + nodeIDSet := mapset.NewSet() + for _, resource := range parentResources { + nodeIDSet = nodeIDSet.Union(dag.getNodeDependenciesForResource(node, resource)) + } + nodeDependencies := make([]DagNodeID, nodeIDSet.Cardinality()) + for i, x := range nodeIDSet.ToSlice() { + nodeDependencies[i] = x.(DagNodeID) + } return nodeDependencies } diff --git a/app/graph_test.go b/app/graph_test.go index 5cf8e08733..4b03d6793e 100644 --- a/app/graph_test.go +++ b/app/graph_test.go @@ -149,3 +149,109 @@ func TestCreateGraph(t *testing.T) { slice, ) } + +func TestHierarchyDag(t *testing.T) { + dag := app.NewDag() + /** + tx1: write to A, commit 1 + tx2: read ALL, commit 2 + tx3: write B dexmem, commit 3 + tx4: read A, commit 4 + expected dag + 1wA -> 1c => 2rALL -> 2c + \ \=> 3wB c3 + \---=> 4rA c4 + **/ + + commit := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_COMMIT, + ResourceType: acltypes.ResourceType_ANY, + IdentifierTemplate: "*", + } + writeA := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_WRITE, + ResourceType: acltypes.ResourceType_KV, + IdentifierTemplate: "ResourceA", + } + readA := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_READ, + ResourceType: acltypes.ResourceType_KV, + IdentifierTemplate: "ResourceA", + } + writeB := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_WRITE, + ResourceType: acltypes.ResourceType_DexMem, + IdentifierTemplate: "ResourceB", + } + readAll := acltypes.AccessOperation{ + AccessType: acltypes.AccessType_READ, + ResourceType: acltypes.ResourceType_ANY, + IdentifierTemplate: "*", + } + + dag.AddNodeBuildDependency(0, 0, writeA) // node id 0 + dag.AddNodeBuildDependency(0, 0, commit) // node id 1 + dag.AddNodeBuildDependency(0, 1, readAll) // node id 2 + dag.AddNodeBuildDependency(0, 1, commit) // node id 3 + dag.AddNodeBuildDependency(0, 2, writeB) // node id 4 + dag.AddNodeBuildDependency(0, 2, commit) // node id 5 + dag.AddNodeBuildDependency(0, 3, readA) // node id 6 + dag.AddNodeBuildDependency(0, 3, commit) // node id 7 + + // assert dag is acyclic + acyclic := graph.Acyclic(dag) + require.True(t, acyclic) + + require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[0]) + require.Equal( + t, + []app.DagEdge{{1, 2}, {1, 6}}, + dag.EdgesMap[1], + ) + require.Equal( + t, + []app.DagEdge{{2, 4}}, + dag.EdgesMap[2], + ) + require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[3]) + require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[4]) + require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[5]) + require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[6]) + + // test completion signals + completionSignalsMap, blockingSignalsMap := dag.BuildCompletionSignalMaps() + + channel0 := completionSignalsMap[0][0][commit][0].Channel + channel1 := completionSignalsMap[0][0][commit][1].Channel + channel2 := completionSignalsMap[1][0][readAll][0].Channel + + signal0 := app.CompletionSignal{1, 2, commit, readAll, channel0} + signal1 := app.CompletionSignal{1, 6, commit, readA, channel1} + signal2 := app.CompletionSignal{2, 4, readAll, writeB, channel2} + + require.Equal( + t, + []app.CompletionSignal{signal0, signal1}, + completionSignalsMap[0][0][commit], + ) + require.Equal( + t, + []app.CompletionSignal{signal0}, + blockingSignalsMap[1][0][readAll], + ) + require.Equal( + t, + []app.CompletionSignal{signal1}, + blockingSignalsMap[3][0][readA], + ) + require.Equal( + t, + []app.CompletionSignal{signal2}, + completionSignalsMap[1][0][readAll], + ) + require.Equal( + t, + []app.CompletionSignal{signal2}, + blockingSignalsMap[2][0][writeB], + ) +} diff --git a/go.mod b/go.mod index 7d4937d2a6..d25f13cdc5 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/cosmos/cosmos-sdk v0.45.4 github.com/cosmos/go-bip39 v1.0.0 github.com/cosmos/ibc-go/v3 v3.0.0 + github.com/deckarep/golang-set v1.8.0 github.com/gogo/protobuf v1.3.3 github.com/golang/protobuf v1.5.2 github.com/gorilla/mux v1.8.0 @@ -130,7 +131,7 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.69 + github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.73 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.56 diff --git a/go.sum b/go.sum index 34b09eddae..5c2e2530e1 100644 --- a/go.sum +++ b/go.sum @@ -291,6 +291,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= +github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4= +github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/denis-tingaikin/go-header v0.4.3/go.mod h1:0wOCWuN71D5qIgE2nz9KrKmuYBAC2Mra5RassOIQ2/c= github.com/denisenkom/go-mssqldb v0.12.0/go.mod h1:iiK0YP1ZeepvmBQk/QpLEhhTNJgfzrpArPY/aFvc9yU= @@ -1095,8 +1097,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+fscA+Pulbpo= github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= -github.com/sei-protocol/sei-cosmos v0.1.69 h1:jPFTf6vSg3DQ0FgKpQGw6ZEUuElKL2UJ3P8SBXbYnio= -github.com/sei-protocol/sei-cosmos v0.1.69/go.mod h1:Oaj7toqHCkwEEb+sDIWxtfTkPZxOpMXBXDMvIIqUjpw= +github.com/sei-protocol/sei-cosmos v0.1.73 h1:yFG32zV8BGCbsS/AQYUNc2ZFWWZ2lXv505Gkf6vauKE= +github.com/sei-protocol/sei-cosmos v0.1.73/go.mod h1:Oaj7toqHCkwEEb+sDIWxtfTkPZxOpMXBXDMvIIqUjpw= github.com/sei-protocol/sei-tendermint v0.1.56 h1:iRVhiIetj+GSwpBzaR9lqgvmgcOqmShfACS6x3tuBIw= github.com/sei-protocol/sei-tendermint v0.1.56/go.mod h1:Olwbjyagrpoxj5DAUhHxMTWDVEfQ3FYdpypaJ3+6Hs8= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=