Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,8 +1112,7 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ
ctx.Logger().Error(fmt.Sprintf("Error while building DAG, processing synchronously: %s", err))
txResults = app.ProcessBlockSynchronous(ctx, txs)
default:
completionSignalingMap, blockingSignalsMap := dependencyDag.BuildCompletionSignalMaps()
txResults = app.ProcessBlockConcurrent(ctx, txs, completionSignalingMap, blockingSignalsMap)
txResults = app.ProcessBlockConcurrent(ctx, txs, dependencyDag.CompletionSignalingMap, dependencyDag.BlockingSignalsMap)
}

endBlockResp := app.EndBlock(ctx, abci.RequestEndBlock{
Expand Down
98 changes: 44 additions & 54 deletions app/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package app

import (
"fmt"
"time"

acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol"
mapset "github.com/deckarep/golang-set"
"github.com/sei-protocol/sei-chain/utils/metrics"
)

type DagNodeID int
Expand All @@ -32,11 +30,13 @@ type DagEdge struct {
}

type Dag struct {
NodeMap map[DagNodeID]DagNode
EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info
ResourceAccessMap map[ResourceAccess]ResourceIdentifierNodeIDMapping // maps resource type and access type to identifiers + node IDs
TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index
NextID DagNodeID
NodeMap map[DagNodeID]DagNode
EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info
ResourceAccessMap map[ResourceAccess]ResourceIdentifierNodeIDMapping // maps resource type and access type to identifiers + node IDs
TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index
NextID DagNodeID
CompletionSignalingMap map[int]MessageCompletionSignalMapping // keys on tx index
BlockingSignalsMap map[int]MessageCompletionSignalMapping // keys on tx index
}

// Alias for mapping MessageIndexId -> AccessOperations -> CompletionSignals
Expand All @@ -55,6 +55,7 @@ func (dag *Dag) GetCompletionSignal(edge DagEdge) *CompletionSignal {
fromNode := dag.NodeMap[edge.FromNodeID]
toNode := dag.NodeMap[edge.ToNodeID]
if fromNode.TxIndex == toNode.TxIndex {
// TODO: we may be able to remove this now since we don't created edges within a tx now
return nil
}
return &CompletionSignal{
Expand Down Expand Up @@ -85,11 +86,13 @@ func (dag Dag) Visit(v int, do func(w int, c int64) (skip bool)) (aborted bool)

func NewDag() Dag {
return Dag{
NodeMap: make(map[DagNodeID]DagNode),
EdgesMap: make(map[DagNodeID][]DagEdge),
ResourceAccessMap: make(map[ResourceAccess]ResourceIdentifierNodeIDMapping),
TxIndexMap: make(map[int]DagNodeID),
NextID: 0,
NodeMap: make(map[DagNodeID]DagNode),
EdgesMap: make(map[DagNodeID][]DagEdge),
ResourceAccessMap: make(map[ResourceAccess]ResourceIdentifierNodeIDMapping),
TxIndexMap: make(map[int]DagNodeID),
NextID: 0,
CompletionSignalingMap: make(map[int]MessageCompletionSignalMapping),
BlockingSignalsMap: make(map[int]MessageCompletionSignalMapping),
}
}

Expand Down Expand Up @@ -142,7 +145,15 @@ func (dag *Dag) AddNodeBuildDependency(messageIndex int, txIndex int, accessOp a
nodeDependencies := dag.GetNodeDependencies(dagNode)
// build edges for each of the dependencies
for _, nodeDependency := range nodeDependencies {
dag.AddEdge(nodeDependency, dagNode.NodeID)
edge := dag.AddEdge(nodeDependency, dagNode.NodeID)
// also add completion signal corresponding to the edge
if edge != nil {
maybeCompletionSignal := dag.GetCompletionSignal(*edge)
if maybeCompletionSignal != nil {
completionSignal := *maybeCompletionSignal
dag.AddCompletionSignal(completionSignal)
}
}
}

// update access ops map with the latest node id using a specific access op
Expand Down Expand Up @@ -280,49 +291,28 @@ func (dag *Dag) GetNodeDependencies(node DagNode) []DagNodeID {
return nodeDependencies
}

// returns completion signaling map and blocking signals map
func (dag *Dag) BuildCompletionSignalMaps() (
completionSignalingMap map[int]MessageCompletionSignalMapping,
blockingSignalsMap map[int]MessageCompletionSignalMapping,
) {
defer metrics.MeasureBuildDagDuration(time.Now(), "BuildCompletionSignalMaps")
completionSignalingMap = make(map[int]MessageCompletionSignalMapping)
blockingSignalsMap = make(map[int]MessageCompletionSignalMapping)
// go through every node
for _, node := range dag.NodeMap {
// for each node, assign its completion signaling, and also assign blocking signals for the destination nodes
if outgoingEdges, ok := dag.EdgesMap[node.NodeID]; ok {
for _, edge := range outgoingEdges {
maybeCompletionSignal := dag.GetCompletionSignal(edge)
if maybeCompletionSignal != nil {
completionSignal := *maybeCompletionSignal

toNode := dag.NodeMap[edge.ToNodeID]
if _, exists := blockingSignalsMap[toNode.TxIndex]; !exists {
blockingSignalsMap[toNode.TxIndex] = make(MessageCompletionSignalMapping)
}
if _, exists := blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex]; !exists {
blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex] = make(map[acltypes.AccessOperation][]CompletionSignal)
}
// add it to the right blocking signal in the right txindex
prevBlockSignalMapping := blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation]
blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] = append(prevBlockSignalMapping, completionSignal)

if _, exists := completionSignalingMap[node.TxIndex]; !exists {
completionSignalingMap[node.TxIndex] = make(MessageCompletionSignalMapping)
}
if _, exists := completionSignalingMap[node.TxIndex][node.MessageIndex]; !exists {
completionSignalingMap[node.TxIndex][node.MessageIndex] = make(map[acltypes.AccessOperation][]CompletionSignal)
}
// add it to the completion signal for the tx index
prevCompletionSignalMapping := completionSignalingMap[node.TxIndex][node.MessageIndex][completionSignal.CompletionAccessOperation]
completionSignalingMap[node.TxIndex][node.MessageIndex][completionSignal.CompletionAccessOperation] = append(prevCompletionSignalMapping, completionSignal)
}
func (dag *Dag) AddCompletionSignal(completionSignal CompletionSignal) {
toNode := dag.NodeMap[completionSignal.ToNodeID]
if _, exists := dag.BlockingSignalsMap[toNode.TxIndex]; !exists {
dag.BlockingSignalsMap[toNode.TxIndex] = make(MessageCompletionSignalMapping)
}
if _, exists := dag.BlockingSignalsMap[toNode.TxIndex][toNode.MessageIndex]; !exists {
dag.BlockingSignalsMap[toNode.TxIndex][toNode.MessageIndex] = make(map[acltypes.AccessOperation][]CompletionSignal)
}
// add it to the right blocking signal in the right txindex
prevBlockSignalMapping := dag.BlockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation]
dag.BlockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] = append(prevBlockSignalMapping, completionSignal)

}
}
fromNode := dag.NodeMap[completionSignal.FromNodeID]
if _, exists := dag.CompletionSignalingMap[fromNode.TxIndex]; !exists {
dag.CompletionSignalingMap[fromNode.TxIndex] = make(MessageCompletionSignalMapping)
}
if _, exists := dag.CompletionSignalingMap[fromNode.TxIndex][fromNode.MessageIndex]; !exists {
dag.CompletionSignalingMap[fromNode.TxIndex][fromNode.MessageIndex] = make(map[acltypes.AccessOperation][]CompletionSignal)
}
return completionSignalingMap, blockingSignalsMap
// add it to the completion signal for the tx index
prevCompletionSignalMapping := dag.CompletionSignalingMap[fromNode.TxIndex][fromNode.MessageIndex][completionSignal.CompletionAccessOperation]
dag.CompletionSignalingMap[fromNode.TxIndex][fromNode.MessageIndex][completionSignal.CompletionAccessOperation] = append(prevCompletionSignalMapping, completionSignal)
}

var (
Expand Down
4 changes: 2 additions & 2 deletions app/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestCreateGraph(t *testing.T) {
require.True(t, acyclic)

// test completion signals
completionSignalsMap, blockingSignalsMap := dag.BuildCompletionSignalMaps()
completionSignalsMap, blockingSignalsMap := dag.CompletionSignalingMap, dag.BlockingSignalsMap

channel0 := completionSignalsMap[0][0][commitAccessOp][0].Channel
channel1 := completionSignalsMap[0][0][commitAccessOp][1].Channel
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestHierarchyDag(t *testing.T) {
require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[6])

// test completion signals
completionSignalsMap, blockingSignalsMap := dag.BuildCompletionSignalMaps()
completionSignalsMap, blockingSignalsMap := dag.CompletionSignalingMap, dag.BlockingSignalsMap

channel0 := completionSignalsMap[0][0][commit][0].Channel
channel1 := completionSignalsMap[0][0][commit][1].Channel
Expand Down