Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 146 additions & 62 deletions app/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@ import (
"fmt"

acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol"
mapset "github.com/deckarep/golang-set"
)

type DagNodeID int

// Alias for mapping resource identifier to dag node IDs
type ResourceIdentifierNodeIDMapping = map[string][]DagNodeID

type ResourceAccess struct {
ResourceType acltypes.ResourceType
AccessType acltypes.AccessType
}

type DagNode struct {
NodeID DagNodeID
MessageIndex int
Expand All @@ -21,11 +30,11 @@ type DagEdge struct {
}

type Dag struct {
NodeMap map[DagNodeID]DagNode
EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info
AccessOpsMap map[acltypes.AccessOperation][]DagNodeID // tracks latest node to use a specific access op
TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index
NextID DagNodeID
NodeMap map[DagNodeID]DagNode
EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info
ResourceAccessMap map[ResourceAccess]ResourceIdentifierNodeIDMapping // maps resource type and access type to identifiers + node IDs
TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index
NextID DagNodeID
}

// Alias for mapping MessageIndexId -> AccessOperations -> CompletionSignals
Expand Down Expand Up @@ -74,11 +83,18 @@ func (dag Dag) Visit(v int, do func(w int, c int64) (skip bool)) (aborted bool)

func NewDag() Dag {
return Dag{
NodeMap: make(map[DagNodeID]DagNode),
EdgesMap: make(map[DagNodeID][]DagEdge),
AccessOpsMap: make(map[acltypes.AccessOperation][]DagNodeID),
TxIndexMap: make(map[int]DagNodeID),
NextID: 0,
NodeMap: make(map[DagNodeID]DagNode),
EdgesMap: make(map[DagNodeID][]DagEdge),
ResourceAccessMap: make(map[ResourceAccess]ResourceIdentifierNodeIDMapping),
TxIndexMap: make(map[int]DagNodeID),
NextID: 0,
}
}

func GetResourceAccess(accessOp acltypes.AccessOperation) ResourceAccess {
return ResourceAccess{
accessOp.ResourceType,
accessOp.AccessType,
}
}

Expand Down Expand Up @@ -124,73 +140,141 @@ func (dag *Dag) AddNodeBuildDependency(messageIndex int, txIndex int, accessOp a
nodeDependencies := dag.GetNodeDependencies(dagNode)
// build edges for each of the dependencies
for _, nodeDependency := range nodeDependencies {
dag.AddEdge(nodeDependency.NodeID, dagNode.NodeID)
dag.AddEdge(nodeDependency, dagNode.NodeID)
}

// update access ops map with the latest node id using a specific access op
dag.AccessOpsMap[accessOp] = append(dag.AccessOpsMap[accessOp], dagNode.NodeID)
resourceAccess := GetResourceAccess(accessOp)
if _, exists := dag.ResourceAccessMap[resourceAccess]; !exists {
dag.ResourceAccessMap[resourceAccess] = make(ResourceIdentifierNodeIDMapping)
}
dag.ResourceAccessMap[resourceAccess][accessOp.IdentifierTemplate] = append(dag.ResourceAccessMap[resourceAccess][accessOp.IdentifierTemplate], dagNode.NodeID)
}

// This helper will identify nodes that are dependencies for the current node, and can then be used for creating edges between then for future completion signals
func (dag *Dag) GetNodeDependencies(node DagNode) (nodeDependencies []DagNode) {
accessOp := node.AccessOperation
// if the blocking access ops are in access ops map, make an edge
switch accessOp.AccessType {
case acltypes.AccessType_READ:
// if we need to do a read, we need latest write as a dependency
// TODO: replace hardcoded access op dependencies with helper that generates (and also generates superseding resources too eg. Resource.ALL is blocking for Resource.KV)
writeAccessOp := acltypes.AccessOperation{
AccessType: acltypes.AccessType_WRITE,
ResourceType: accessOp.GetResourceType(),
IdentifierTemplate: accessOp.GetIdentifierTemplate(),
func getAllNodeIDsFromIdentifierMapping(mapping ResourceIdentifierNodeIDMapping) (allNodeIDs []DagNodeID) {
for _, nodeIDs := range mapping {
allNodeIDs = append(allNodeIDs, nodeIDs...)
}
return
}

func (dag *Dag) getDependencyWrites(node DagNode, dependentResource acltypes.ResourceType) mapset.Set {
nodeIDs := mapset.NewSet()
writeResourceAccess := ResourceAccess{
dependentResource,
acltypes.AccessType_WRITE,
}
if identifierNodeMapping, ok := dag.ResourceAccessMap[writeResourceAccess]; ok {
var nodeIDsMaybeDependency []DagNodeID
if dependentResource != node.AccessOperation.ResourceType {
// we can add all node IDs as dependencies if applicable
nodeIDsMaybeDependency = getAllNodeIDsFromIdentifierMapping(identifierNodeMapping)
} else {
// TODO: otherwise we need to have partial filtering on identifiers
// for now, lets just perform exact matching on identifiers
nodeIDsMaybeDependency = identifierNodeMapping[node.AccessOperation.IdentifierTemplate]
}
if writeNodeIDs, ok := dag.AccessOpsMap[writeAccessOp]; ok {
for _, wn := range writeNodeIDs {
writeNode := dag.NodeMap[wn]
// if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two)
// if from a previous transaction, we need to create an edge
if writeNode.TxIndex < node.TxIndex {
// this should be the COMMIT access op for the tx
lastTxNode := dag.NodeMap[dag.TxIndexMap[writeNode.TxIndex]]
nodeDependencies = append(nodeDependencies, lastTxNode)
}
for _, wn := range nodeIDsMaybeDependency {
writeNode := dag.NodeMap[wn]
// if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two)
// if from a previous transaction, we need to create an edge
if writeNode.TxIndex < node.TxIndex {
// this should be the COMMIT access op for the tx
lastTxNode := dag.NodeMap[dag.TxIndexMap[writeNode.TxIndex]]
nodeIDs.Add(lastTxNode.NodeID)
}
}
case acltypes.AccessType_WRITE, acltypes.AccessType_UNKNOWN:
// if we need to do a write, we need read and write as dependencies
writeAccessOp := acltypes.AccessOperation{
AccessType: acltypes.AccessType_WRITE,
ResourceType: accessOp.GetResourceType(),
IdentifierTemplate: accessOp.GetIdentifierTemplate(),
}
return nodeIDs
}

func (dag *Dag) getDependencyUnknowns(node DagNode, dependentResource acltypes.ResourceType) mapset.Set {
nodeIDs := mapset.NewSet()
unknownResourceAccess := ResourceAccess{
dependentResource,
acltypes.AccessType_UNKNOWN,
}
if identifierNodeMapping, ok := dag.ResourceAccessMap[unknownResourceAccess]; ok {
var nodeIDsMaybeDependency []DagNodeID
if dependentResource != node.AccessOperation.ResourceType {
// we can add all node IDs as dependencies if applicable
nodeIDsMaybeDependency = getAllNodeIDsFromIdentifierMapping(identifierNodeMapping)
} else {
// TODO: otherwise we need to have partial filtering on identifiers
// for now, lets just perform exact matching on identifiers
nodeIDsMaybeDependency = identifierNodeMapping[node.AccessOperation.IdentifierTemplate]
}
if writeNodeIDs, ok := dag.AccessOpsMap[writeAccessOp]; ok {
for _, wn := range writeNodeIDs {
// if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two)
writeNode := dag.NodeMap[wn]
// if from a previous transaction, we need to create an edge
if writeNode.TxIndex < node.TxIndex {
// we need to get the last node from that tx
lastTxNode := dag.NodeMap[dag.TxIndexMap[writeNode.TxIndex]]
nodeDependencies = append(nodeDependencies, lastTxNode)
}
for _, un := range nodeIDsMaybeDependency {
uNode := dag.NodeMap[un]
// if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two)
// if from a previous transaction, we need to create an edge
if uNode.TxIndex < node.TxIndex {
// this should be the COMMIT access op for the tx
lastTxNode := dag.NodeMap[dag.TxIndexMap[uNode.TxIndex]]
nodeIDs.Add(lastTxNode.NodeID)
}
}
readAccessOp := acltypes.AccessOperation{
AccessType: acltypes.AccessType_READ,
ResourceType: accessOp.GetResourceType(),
IdentifierTemplate: accessOp.GetIdentifierTemplate(),
}
return nodeIDs
}

func (dag *Dag) getDependencyReads(node DagNode, dependentResource acltypes.ResourceType) mapset.Set {
nodeIDs := mapset.NewSet()
readResourceAccess := ResourceAccess{
dependentResource,
acltypes.AccessType_READ,
}
if identifierNodeMapping, ok := dag.ResourceAccessMap[readResourceAccess]; ok {
var nodeIDsMaybeDependency []DagNodeID
if dependentResource != node.AccessOperation.ResourceType {
// we can add all node IDs as dependencies if applicable
nodeIDsMaybeDependency = getAllNodeIDsFromIdentifierMapping(identifierNodeMapping)
} else {
// TODO: otherwise we need to have partial filtering on identifiers
// for now, lets just perform exact matching on identifiers
nodeIDsMaybeDependency = identifierNodeMapping[node.AccessOperation.IdentifierTemplate]
}
if readNodeIDs, ok := dag.AccessOpsMap[readAccessOp]; ok {
for _, rn := range readNodeIDs {
readNode := dag.NodeMap[rn]
// if accessOp exists already (and from a previous transaction), we need to define a dependency on the previous message (and make a edge between the two)
// if from a previous transaction, we need to create an edge
if readNode.TxIndex < node.TxIndex {
nodeDependencies = append(nodeDependencies, readNode)
}
for _, rn := range nodeIDsMaybeDependency {
readNode := dag.NodeMap[rn]
// if from a previous transaction, we need to create an edge
if readNode.TxIndex < node.TxIndex {
nodeIDs.Add(readNode.NodeID)
}
}
}
return nodeIDs
}

// given a node, and a dependent Resource, generate a set of nodes that are dependencies
func (dag *Dag) getNodeDependenciesForResource(node DagNode, dependentResource acltypes.ResourceType) mapset.Set {
nodeIDs := mapset.NewSet()
switch node.AccessOperation.AccessType {
case acltypes.AccessType_READ:
// for a read, we are blocked on prior writes and unknown
nodeIDs = nodeIDs.Union(dag.getDependencyWrites(node, dependentResource))
nodeIDs = nodeIDs.Union(dag.getDependencyUnknowns(node, dependentResource))
case acltypes.AccessType_WRITE, acltypes.AccessType_UNKNOWN:
// for write / unknown, we're blocked on prior writes, reads, and unknowns
nodeIDs = nodeIDs.Union(dag.getDependencyWrites(node, dependentResource))
nodeIDs = nodeIDs.Union(dag.getDependencyUnknowns(node, dependentResource))
nodeIDs = nodeIDs.Union(dag.getDependencyReads(node, dependentResource))
}
return nodeIDs
}

// This helper will identify nodes that are dependencies for the current node, and can then be used for creating edges between then for future completion signals
func (dag *Dag) GetNodeDependencies(node DagNode) []DagNodeID {
accessOp := node.AccessOperation
// get all parent resource types, we'll need to create edges for any of these
parentResources := accessOp.ResourceType.GetResourceDependencies()
nodeIDSet := mapset.NewSet()
for _, resource := range parentResources {
nodeIDSet = nodeIDSet.Union(dag.getNodeDependenciesForResource(node, resource))
}
nodeDependencies := make([]DagNodeID, nodeIDSet.Cardinality())
for i, x := range nodeIDSet.ToSlice() {
nodeDependencies[i] = x.(DagNodeID)
}
return nodeDependencies
}

Expand Down
106 changes: 106 additions & 0 deletions app/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,109 @@ func TestCreateGraph(t *testing.T) {
slice,
)
}

func TestHierarchyDag(t *testing.T) {
dag := app.NewDag()
/**
tx1: write to A, commit 1
tx2: read ALL, commit 2
tx3: write B dexmem, commit 3
tx4: read A, commit 4
expected dag
1wA -> 1c => 2rALL -> 2c
\ \=> 3wB c3
\---=> 4rA c4
**/

commit := acltypes.AccessOperation{
AccessType: acltypes.AccessType_COMMIT,
ResourceType: acltypes.ResourceType_ANY,
IdentifierTemplate: "*",
}
writeA := acltypes.AccessOperation{
AccessType: acltypes.AccessType_WRITE,
ResourceType: acltypes.ResourceType_KV,
IdentifierTemplate: "ResourceA",
}
readA := acltypes.AccessOperation{
AccessType: acltypes.AccessType_READ,
ResourceType: acltypes.ResourceType_KV,
IdentifierTemplate: "ResourceA",
}
writeB := acltypes.AccessOperation{
AccessType: acltypes.AccessType_WRITE,
ResourceType: acltypes.ResourceType_DexMem,
IdentifierTemplate: "ResourceB",
}
readAll := acltypes.AccessOperation{
AccessType: acltypes.AccessType_READ,
ResourceType: acltypes.ResourceType_ANY,
IdentifierTemplate: "*",
}

dag.AddNodeBuildDependency(0, 0, writeA) // node id 0
dag.AddNodeBuildDependency(0, 0, commit) // node id 1
dag.AddNodeBuildDependency(0, 1, readAll) // node id 2
dag.AddNodeBuildDependency(0, 1, commit) // node id 3
dag.AddNodeBuildDependency(0, 2, writeB) // node id 4
dag.AddNodeBuildDependency(0, 2, commit) // node id 5
dag.AddNodeBuildDependency(0, 3, readA) // node id 6
dag.AddNodeBuildDependency(0, 3, commit) // node id 7

// assert dag is acyclic
acyclic := graph.Acyclic(dag)
require.True(t, acyclic)

require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[0])
require.Equal(
t,
[]app.DagEdge{{1, 2}, {1, 6}},
dag.EdgesMap[1],
)
require.Equal(
t,
[]app.DagEdge{{2, 4}},
dag.EdgesMap[2],
)
require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[3])
require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[4])
require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[5])
require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[6])

// test completion signals
completionSignalsMap, blockingSignalsMap := dag.BuildCompletionSignalMaps()

channel0 := completionSignalsMap[0][0][commit][0].Channel
channel1 := completionSignalsMap[0][0][commit][1].Channel
channel2 := completionSignalsMap[1][0][readAll][0].Channel

signal0 := app.CompletionSignal{1, 2, commit, readAll, channel0}
signal1 := app.CompletionSignal{1, 6, commit, readA, channel1}
signal2 := app.CompletionSignal{2, 4, readAll, writeB, channel2}

require.Equal(
t,
[]app.CompletionSignal{signal0, signal1},
completionSignalsMap[0][0][commit],
)
require.Equal(
t,
[]app.CompletionSignal{signal0},
blockingSignalsMap[1][0][readAll],
)
require.Equal(
t,
[]app.CompletionSignal{signal1},
blockingSignalsMap[3][0][readA],
)
require.Equal(
t,
[]app.CompletionSignal{signal2},
completionSignalsMap[1][0][readAll],
)
require.Equal(
t,
[]app.CompletionSignal{signal2},
blockingSignalsMap[2][0][writeB],
)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/cosmos/cosmos-sdk v0.45.4
github.com/cosmos/go-bip39 v1.0.0
github.com/cosmos/ibc-go/v3 v3.0.0
github.com/deckarep/golang-set v1.8.0
github.com/gogo/protobuf v1.3.3
github.com/golang/protobuf v1.5.2
github.com/gorilla/mux v1.8.0
Expand Down Expand Up @@ -130,7 +131,7 @@ require (
)

replace (
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.69
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.73
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.56
Expand Down
Loading