From c52764a52d0e4d79304c2b52c9f902d38000826d Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Tue, 4 Oct 2022 09:52:32 -0700 Subject: [PATCH 1/2] [graph] Build completion signals along with edges --- app/app.go | 3 +- app/graph.go | 96 ++++++++++++++++++++++------------------------- app/graph_test.go | 4 +- 3 files changed, 47 insertions(+), 56 deletions(-) diff --git a/app/app.go b/app/app.go index 442569af4c..77933ffbd7 100644 --- a/app/app.go +++ b/app/app.go @@ -1112,8 +1112,7 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ ctx.Logger().Error(fmt.Sprintf("Error while building DAG, processing synchronously: %s", err)) txResults = app.ProcessBlockSynchronous(ctx, txs) default: - completionSignalingMap, blockingSignalsMap := dependencyDag.BuildCompletionSignalMaps() - txResults = app.ProcessBlockConcurrent(ctx, txs, completionSignalingMap, blockingSignalsMap) + txResults = app.ProcessBlockConcurrent(ctx, txs, dependencyDag.CompletionSignalingMap, dependencyDag.BlockingSignalsMap) } endBlockResp := app.EndBlock(ctx, abci.RequestEndBlock{ diff --git a/app/graph.go b/app/graph.go index 3c9f8edbdd..6d98e0a24f 100644 --- a/app/graph.go +++ b/app/graph.go @@ -32,11 +32,13 @@ type DagEdge struct { } type Dag struct { - NodeMap map[DagNodeID]DagNode - EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info - ResourceAccessMap map[ResourceAccess]ResourceIdentifierNodeIDMapping // maps resource type and access type to identifiers + node IDs - TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index - NextID DagNodeID + NodeMap map[DagNodeID]DagNode + EdgesMap map[DagNodeID][]DagEdge // maps node Id (from node) and contains edge info + ResourceAccessMap map[ResourceAccess]ResourceIdentifierNodeIDMapping // maps resource type and access type to identifiers + node IDs + TxIndexMap map[int]DagNodeID // tracks latest node ID for a tx index + NextID DagNodeID + CompletionSignalingMap map[int]MessageCompletionSignalMapping // keys on tx index + BlockingSignalsMap map[int]MessageCompletionSignalMapping // keys on tx index } // Alias for mapping MessageIndexId -> AccessOperations -> CompletionSignals @@ -55,6 +57,7 @@ func (dag *Dag) GetCompletionSignal(edge DagEdge) *CompletionSignal { fromNode := dag.NodeMap[edge.FromNodeID] toNode := dag.NodeMap[edge.ToNodeID] if fromNode.TxIndex == toNode.TxIndex { + // TODO: we may be able to remove this now since we don't created edges within a tx now return nil } return &CompletionSignal{ @@ -85,11 +88,13 @@ func (dag Dag) Visit(v int, do func(w int, c int64) (skip bool)) (aborted bool) func NewDag() Dag { return Dag{ - NodeMap: make(map[DagNodeID]DagNode), - EdgesMap: make(map[DagNodeID][]DagEdge), - ResourceAccessMap: make(map[ResourceAccess]ResourceIdentifierNodeIDMapping), - TxIndexMap: make(map[int]DagNodeID), - NextID: 0, + NodeMap: make(map[DagNodeID]DagNode), + EdgesMap: make(map[DagNodeID][]DagEdge), + ResourceAccessMap: make(map[ResourceAccess]ResourceIdentifierNodeIDMapping), + TxIndexMap: make(map[int]DagNodeID), + NextID: 0, + CompletionSignalingMap: make(map[int]MessageCompletionSignalMapping), + BlockingSignalsMap: make(map[int]MessageCompletionSignalMapping), } } @@ -142,7 +147,15 @@ func (dag *Dag) AddNodeBuildDependency(messageIndex int, txIndex int, accessOp a nodeDependencies := dag.GetNodeDependencies(dagNode) // build edges for each of the dependencies for _, nodeDependency := range nodeDependencies { - dag.AddEdge(nodeDependency, dagNode.NodeID) + edge := dag.AddEdge(nodeDependency, dagNode.NodeID) + // also add completion signal corresponding to the edge + if edge != nil { + maybeCompletionSignal := dag.GetCompletionSignal(*edge) + if maybeCompletionSignal != nil { + completionSignal := *maybeCompletionSignal + dag.AddCompletionSignal(completionSignal) + } + } } // update access ops map with the latest node id using a specific access op @@ -280,49 +293,28 @@ func (dag *Dag) GetNodeDependencies(node DagNode) []DagNodeID { return nodeDependencies } -// returns completion signaling map and blocking signals map -func (dag *Dag) BuildCompletionSignalMaps() ( - completionSignalingMap map[int]MessageCompletionSignalMapping, - blockingSignalsMap map[int]MessageCompletionSignalMapping, -) { - defer metrics.MeasureBuildDagDuration(time.Now(), "BuildCompletionSignalMaps") - completionSignalingMap = make(map[int]MessageCompletionSignalMapping) - blockingSignalsMap = make(map[int]MessageCompletionSignalMapping) - // go through every node - for _, node := range dag.NodeMap { - // for each node, assign its completion signaling, and also assign blocking signals for the destination nodes - if outgoingEdges, ok := dag.EdgesMap[node.NodeID]; ok { - for _, edge := range outgoingEdges { - maybeCompletionSignal := dag.GetCompletionSignal(edge) - if maybeCompletionSignal != nil { - completionSignal := *maybeCompletionSignal - - toNode := dag.NodeMap[edge.ToNodeID] - if _, exists := blockingSignalsMap[toNode.TxIndex]; !exists { - blockingSignalsMap[toNode.TxIndex] = make(MessageCompletionSignalMapping) - } - if _, exists := blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex]; !exists { - blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex] = make(map[acltypes.AccessOperation][]CompletionSignal) - } - // add it to the right blocking signal in the right txindex - prevBlockSignalMapping := blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] - blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] = append(prevBlockSignalMapping, completionSignal) - - if _, exists := completionSignalingMap[node.TxIndex]; !exists { - completionSignalingMap[node.TxIndex] = make(MessageCompletionSignalMapping) - } - if _, exists := completionSignalingMap[node.TxIndex][node.MessageIndex]; !exists { - completionSignalingMap[node.TxIndex][node.MessageIndex] = make(map[acltypes.AccessOperation][]CompletionSignal) - } - // add it to the completion signal for the tx index - prevCompletionSignalMapping := completionSignalingMap[node.TxIndex][node.MessageIndex][completionSignal.CompletionAccessOperation] - completionSignalingMap[node.TxIndex][node.MessageIndex][completionSignal.CompletionAccessOperation] = append(prevCompletionSignalMapping, completionSignal) - } +func (dag *Dag) AddCompletionSignal(completionSignal CompletionSignal) { + toNode := dag.NodeMap[completionSignal.ToNodeID] + if _, exists := dag.BlockingSignalsMap[toNode.TxIndex]; !exists { + dag.BlockingSignalsMap[toNode.TxIndex] = make(MessageCompletionSignalMapping) + } + if _, exists := dag.BlockingSignalsMap[toNode.TxIndex][toNode.MessageIndex]; !exists { + dag.BlockingSignalsMap[toNode.TxIndex][toNode.MessageIndex] = make(map[acltypes.AccessOperation][]CompletionSignal) + } + // add it to the right blocking signal in the right txindex + prevBlockSignalMapping := dag.BlockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] + dag.BlockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] = append(prevBlockSignalMapping, completionSignal) - } - } + fromNode := dag.NodeMap[completionSignal.FromNodeID] + if _, exists := dag.CompletionSignalingMap[fromNode.TxIndex]; !exists { + dag.CompletionSignalingMap[fromNode.TxIndex] = make(MessageCompletionSignalMapping) + } + if _, exists := dag.CompletionSignalingMap[fromNode.TxIndex][fromNode.MessageIndex]; !exists { + dag.CompletionSignalingMap[fromNode.TxIndex][fromNode.MessageIndex] = make(map[acltypes.AccessOperation][]CompletionSignal) } - return completionSignalingMap, blockingSignalsMap + // add it to the completion signal for the tx index + prevCompletionSignalMapping := dag.CompletionSignalingMap[fromNode.TxIndex][fromNode.MessageIndex][completionSignal.CompletionAccessOperation] + dag.CompletionSignalingMap[fromNode.TxIndex][fromNode.MessageIndex][completionSignal.CompletionAccessOperation] = append(prevCompletionSignalMapping, completionSignal) } var ( diff --git a/app/graph_test.go b/app/graph_test.go index 4b03d6793e..dea4c6c682 100644 --- a/app/graph_test.go +++ b/app/graph_test.go @@ -94,7 +94,7 @@ func TestCreateGraph(t *testing.T) { require.True(t, acyclic) // test completion signals - completionSignalsMap, blockingSignalsMap := dag.BuildCompletionSignalMaps() + completionSignalsMap, blockingSignalsMap := dag.CompletionSignalingMap, dag.BlockingSignalsMap channel0 := completionSignalsMap[0][0][commitAccessOp][0].Channel channel1 := completionSignalsMap[0][0][commitAccessOp][1].Channel @@ -219,7 +219,7 @@ func TestHierarchyDag(t *testing.T) { require.Equal(t, []app.DagEdge(nil), dag.EdgesMap[6]) // test completion signals - completionSignalsMap, blockingSignalsMap := dag.BuildCompletionSignalMaps() + completionSignalsMap, blockingSignalsMap := dag.CompletionSignalingMap, dag.BlockingSignalsMap channel0 := completionSignalsMap[0][0][commit][0].Channel channel1 := completionSignalsMap[0][0][commit][1].Channel From b82d415553ff2926c46bd00fef613fd6cb7477d5 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Tue, 4 Oct 2022 10:48:23 -0700 Subject: [PATCH 2/2] lint --- app/graph.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/app/graph.go b/app/graph.go index 6d98e0a24f..1b211bc61f 100644 --- a/app/graph.go +++ b/app/graph.go @@ -2,11 +2,9 @@ package app import ( "fmt" - "time" acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol" mapset "github.com/deckarep/golang-set" - "github.com/sei-protocol/sei-chain/utils/metrics" ) type DagNodeID int