Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 123 additions & 23 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

storetypes "github.com/cosmos/cosmos-sdk/store/types"
Expand All @@ -30,6 +31,7 @@ import (
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/simapp"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkacltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol"
"github.com/cosmos/cosmos-sdk/types/module"
"github.com/cosmos/cosmos-sdk/version"
"github.com/cosmos/cosmos-sdk/x/auth"
Expand All @@ -38,6 +40,7 @@ import (
aclmodule "github.com/cosmos/cosmos-sdk/x/accesscontrol"
aclkeeper "github.com/cosmos/cosmos-sdk/x/accesscontrol/keeper"
acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types"

authrest "github.com/cosmos/cosmos-sdk/x/auth/client/rest"
authkeeper "github.com/cosmos/cosmos-sdk/x/auth/keeper"
authsims "github.com/cosmos/cosmos-sdk/x/auth/simulation"
Expand Down Expand Up @@ -881,11 +884,11 @@ func (app *App) BuildDependencyDag(ctx sdk.Context, txs [][]byte) (*Dag, error)
return nil, err
}
msgs := tx.GetMsgs()
for _, msg := range msgs {
for messageIndex, msg := range msgs {
msgDependencies := app.AccessControlKeeper.GetResourceDependencyMapping(ctx, acltypes.GenerateMessageKey(msg))
for _, accessOp := range msgDependencies.AccessOps {
for _, accessOp := range msgDependencies.GetAccessOps() {
// make a new node in the dependency dag
dependencyDag.AddNodeBuildDependency(txIndex, accessOp)
dependencyDag.AddNodeBuildDependency(messageIndex, txIndex, accessOp)
}
}

Expand Down Expand Up @@ -926,6 +929,115 @@ func (app *App) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock)
return &resp, nil
}

func (app *App) DeliverTxWithResult(ctx sdk.Context, tx []byte) *abci.ExecTxResult {
deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{
Tx: tx,
})
return &abci.ExecTxResult{
Code: deliverTxResp.Code,
Data: deliverTxResp.Data,
Log: deliverTxResp.Log,
Info: deliverTxResp.Info,
GasWanted: deliverTxResp.GasWanted,
GasUsed: deliverTxResp.GasUsed,
Events: deliverTxResp.Events,
Codespace: deliverTxResp.Codespace,
}
}

func (app *App) ProcessBlockSynchronous(ctx sdk.Context, txs [][]byte) []*abci.ExecTxResult {
txResults := []*abci.ExecTxResult{}
for _, tx := range txs {
txResults = append(txResults, app.DeliverTxWithResult(ctx, tx))
}
return txResults
}

// Returns a mapping of the accessOperation to the channels
func getChannelsFromSignalMapping(signalMapping MessageCompletionSignalMapping) sdkacltypes.MessageAccessOpsChannelMapping {
channelsMapping := make(sdkacltypes.MessageAccessOpsChannelMapping)
for messageIndex, accessOperationsToSignal := range signalMapping {
for accessOperation, completionSignals := range accessOperationsToSignal {
var channels []chan interface{}
for _, completionSignal := range completionSignals {
channels = append(channels, completionSignal.Channel)
}
channelsMapping[messageIndex][accessOperation] = channels
}
}
return channelsMapping
}

type ChannelResult struct {
txIndex int
result *abci.ExecTxResult
}

func (app *App) ProcessTxConcurrent(
ctx sdk.Context,
txIndex int,
txBytes []byte,
wg *sync.WaitGroup,
resultChan chan<- ChannelResult,
txCompletionSignalingMap MessageCompletionSignalMapping,
txBlockingSignalsMap MessageCompletionSignalMapping,
) {
defer wg.Done()
// Store the Channels in the Context Object for each transaction
ctx.WithTxBlockingChannels(getChannelsFromSignalMapping(txBlockingSignalsMap))
ctx.WithTxCompletionChannels(getChannelsFromSignalMapping(txCompletionSignalingMap))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add TODOs to block on blocking channels and writing to completion channels to properly block and release resources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Deliver the transaction and store the result in the channel
resultChan <- ChannelResult{txIndex, app.DeliverTxWithResult(ctx, txBytes)}
}

func (app *App) ProcessBlockConcurrent(
ctx sdk.Context,
txs [][]byte,
completionSignalingMap map[int]MessageCompletionSignalMapping,
blockingSignalsMap map[int]MessageCompletionSignalMapping,
) []*abci.ExecTxResult {
var waitGroup sync.WaitGroup
resultChan := make(chan ChannelResult)
txResults := []*abci.ExecTxResult{}

// If there's no transactions then return empty results
if len(txs) == 0 {
return txResults
}

// For each transaction, start goroutine and deliver TX
for txIndex, txBytes := range txs {
waitGroup.Add(1)
go app.ProcessTxConcurrent(
ctx,
txIndex,
txBytes,
&waitGroup,
resultChan,
completionSignalingMap[txIndex],
blockingSignalsMap[txIndex],
)
}

// Waits for all the transactions to complete
waitGroup.Wait()

// Gather Results and store it based on txIndex
// Concurrent results may be in different order than the original txIndex
txResultsMap := map[int]*abci.ExecTxResult{}
for result := range resultChan {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isnt necessarily going to be in the same order as txs that were provided, right? is that safe, or should we also include the tx index with the results so we can sort into the tx index order prior to returning Tx results?

Copy link
Contributor Author

@BrandonWeng BrandonWeng Sep 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will be in the order that it's completed in - let me double check if the ordering matters

txResultsMap[result.txIndex] = result.result
}

// Gather Results and store in array based on txIndex to preserve ordering
for txIndex := range txs {
txResults = append(txResults, txResultsMap[txIndex])
}

return txResults
}

func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequest, lastCommit abci.CommitInfo) ([]abci.Event, []*abci.ExecTxResult, abci.ResponseEndBlock, error) {
goCtx := app.decorateContextWithDexMemState(ctx.Context())
ctx = ctx.WithContext(goCtx)
Expand Down Expand Up @@ -967,30 +1079,18 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ
// }
// app.batchVerifier.VerifyTxs(ctx, typedTxs)

dag, err := app.BuildDependencyDag(ctx, txs)
dependencyDag, err := app.BuildDependencyDag(ctx, txs)
txResults := []*abci.ExecTxResult{}

// TODO:: add metrics for async vs sync
if err != nil {
// something went wrong in dag, process txs sequentially
for _, tx := range txs {
// ctx = ctx.WithContext(context.WithValue(ctx.Context(), ante.ContextKeyTxIndexKey, i))
deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{
Tx: tx,
})
txResults = append(txResults, &abci.ExecTxResult{
Code: deliverTxResp.Code,
Data: deliverTxResp.Data,
Log: deliverTxResp.Log,
Info: deliverTxResp.Info,
GasWanted: deliverTxResp.GasWanted,
GasUsed: deliverTxResp.GasUsed,
Events: deliverTxResp.Events,
Codespace: deliverTxResp.Codespace,
})
ctx.Logger().Error(fmt.Sprintf("Error while building DAG, processing synchronously: %s", err))
if err == ErrCycleInDAG {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add metrics over how many times we process synchronously vs concurrently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a todo here, looks like we don't have the telemetry lib from master. It would be easier to do once we merge this into master

txResults = app.ProcessBlockSynchronous(ctx, txs)
}
} else {
// no error, lets process txs concurrently
_, _ = dag.BuildCompletionSignalMaps()
// TODO: create channel map here
completionSignalingMap, blockingSignalsMap := dependencyDag.BuildCompletionSignalMaps()
txResults = app.ProcessBlockConcurrent(ctx, txs, completionSignalingMap, blockingSignalsMap)
}

endBlockResp := app.EndBlock(ctx, abci.RequestEndBlock{
Expand Down
43 changes: 29 additions & 14 deletions app/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package app
import (
"fmt"

acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types"
acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol"
)

type DagNodeID int

type DagNode struct {
NodeID DagNodeID
MessageIndex int
TxIndex int
AccessOperation acltypes.AccessOperation
}
Expand All @@ -27,6 +28,17 @@ type Dag struct {
NextID DagNodeID
}

// Alias for mapping MessageIndexId -> AccessOperations -> CompletionSignals
type MessageCompletionSignalMapping = map[int]map[acltypes.AccessOperation][]CompletionSignal

type CompletionSignal struct {
FromNodeID DagNodeID
ToNodeID DagNodeID
CompletionAccessOperation acltypes.AccessOperation // this is the access operation that must complete in order to send the signal
BlockedAccessOperation acltypes.AccessOperation // this is the access operation that is blocked by the completion access operation
Channel chan interface{}
}

func (dag *Dag) GetCompletionSignal(edge DagEdge) *CompletionSignal {
// only if tx indexes are different
fromNode := dag.NodeMap[edge.FromNodeID]
Expand All @@ -39,6 +51,8 @@ func (dag *Dag) GetCompletionSignal(edge DagEdge) *CompletionSignal {
ToNodeID: toNode.NodeID,
CompletionAccessOperation: fromNode.AccessOperation,
BlockedAccessOperation: toNode.AccessOperation,
// channel used for signalling
Channel: make(chan interface{}),
}
}

Expand Down Expand Up @@ -68,9 +82,10 @@ func NewDag() Dag {
}
}

func (dag *Dag) AddNode(txIndex int, accessOp acltypes.AccessOperation) DagNode {
func (dag *Dag) AddNode(messageIndex int, txIndex int, accessOp acltypes.AccessOperation) DagNode {
dagNode := DagNode{
NodeID: dag.NextID,
MessageIndex: messageIndex,
TxIndex: txIndex,
AccessOperation: accessOp,
}
Expand Down Expand Up @@ -101,8 +116,8 @@ func (dag *Dag) AddEdge(fromIndex DagNodeID, toIndex DagNodeID) *DagEdge {
// that will allow the dependent goroutines to cordinate execution safely.
//
// It will also register the new node with AccessOpsMap so that future nodes that amy be dependent on this one can properly identify the dependency.
func (dag *Dag) AddNodeBuildDependency(txIndex int, accessOp acltypes.AccessOperation) {
dagNode := dag.AddNode(txIndex, accessOp)
func (dag *Dag) AddNodeBuildDependency(messageIndex int, txIndex int, accessOp acltypes.AccessOperation) {
dagNode := dag.AddNode(messageIndex, txIndex, accessOp)
// if in TxIndexMap, make an edge from the previous node index
if lastTxNodeID, ok := dag.TxIndexMap[txIndex]; ok {
// TODO: we actually don't necessarily need these edges, but keeping for now so we can first determine that cycles can't be missed if we remove these
Expand Down Expand Up @@ -186,7 +201,10 @@ func (dag *Dag) GetNodeDependencies(node DagNode) (nodeDependencies []DagNode) {
}

// returns completion signaling map and blocking signals map
func (dag *Dag) BuildCompletionSignalMaps() (completionSignalingMap map[int]map[acltypes.AccessOperation][]CompletionSignal, blockingSignalsMap map[int]map[acltypes.AccessOperation][]CompletionSignal) {
func (dag *Dag) BuildCompletionSignalMaps() (
completionSignalingMap map[int]MessageCompletionSignalMapping,
blockingSignalsMap map[int]MessageCompletionSignalMapping,
) {
// go through every node
for _, node := range dag.NodeMap {
// for each node, assign its completion signaling, and also assign blocking signals for the destination nodes
Expand All @@ -195,11 +213,15 @@ func (dag *Dag) BuildCompletionSignalMaps() (completionSignalingMap map[int]map[
maybeCompletionSignal := dag.GetCompletionSignal(edge)
if maybeCompletionSignal != nil {
completionSignal := *maybeCompletionSignal

// add it to the right blocking signal in the right txindex
toNode := dag.NodeMap[edge.ToNodeID]
blockingSignalsMap[toNode.TxIndex][completionSignal.BlockedAccessOperation] = append(blockingSignalsMap[toNode.TxIndex][completionSignal.BlockedAccessOperation], completionSignal)
prevBlockSignalMapping := blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation]
blockingSignalsMap[toNode.TxIndex][toNode.MessageIndex][completionSignal.BlockedAccessOperation] = append(prevBlockSignalMapping, completionSignal)

// add it to the completion signal for the tx index
completionSignalingMap[node.TxIndex][completionSignal.CompletionAccessOperation] = append(completionSignalingMap[node.TxIndex][completionSignal.CompletionAccessOperation], completionSignal)
prevCompletionSignalMapping := completionSignalingMap[node.TxIndex][node.MessageIndex][completionSignal.CompletionAccessOperation]
completionSignalingMap[node.TxIndex][node.MessageIndex][completionSignal.CompletionAccessOperation] = append(prevCompletionSignalMapping, completionSignal)
}

}
Expand All @@ -208,11 +230,4 @@ func (dag *Dag) BuildCompletionSignalMaps() (completionSignalingMap map[int]map[
return
}

type CompletionSignal struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this up with all the other types

FromNodeID DagNodeID
ToNodeID DagNodeID
CompletionAccessOperation acltypes.AccessOperation // this is the access operation that must complete in order to send the signal
BlockedAccessOperation acltypes.AccessOperation // this is the access operation that is blocked by the completion access operation
}

var ErrCycleInDAG = fmt.Errorf("cycle detected in DAG")
24 changes: 12 additions & 12 deletions app/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package app_test
import (
"testing"

acltypes "github.com/cosmos/cosmos-sdk/x/accesscontrol/types"
acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol"
"github.com/sei-protocol/sei-chain/app"
"github.com/stretchr/testify/require"
"github.com/yourbasic/graph"
Expand Down Expand Up @@ -48,17 +48,17 @@ func TestCreateGraph(t *testing.T) {
IdentifierTemplate: "ResourceB",
}

dag.AddNodeBuildDependency(1, writeAccessA) // node id 0
dag.AddNodeBuildDependency(1, readAccessB) // node id 1
dag.AddNodeBuildDependency(1, commitAccessOp) // node id 2
dag.AddNodeBuildDependency(2, readAccessA) // node id 3
dag.AddNodeBuildDependency(2, readAccessB) // node id 4
dag.AddNodeBuildDependency(2, commitAccessOp) // node id 5
dag.AddNodeBuildDependency(3, readAccessB) // node id 6
dag.AddNodeBuildDependency(3, readAccessA) // node id 7
dag.AddNodeBuildDependency(3, commitAccessOp) // node id 8
dag.AddNodeBuildDependency(4, writeAccessB) // node id 9
dag.AddNodeBuildDependency(4, commitAccessOp) // node id 10
dag.AddNodeBuildDependency(1, 1, writeAccessA) // node id 0
dag.AddNodeBuildDependency(1, 1, readAccessB) // node id 1
dag.AddNodeBuildDependency(1, 1, commitAccessOp) // node id 2
dag.AddNodeBuildDependency(1, 2, readAccessA) // node id 3
dag.AddNodeBuildDependency(1, 2, readAccessB) // node id 4
dag.AddNodeBuildDependency(1, 2, commitAccessOp) // node id 5
dag.AddNodeBuildDependency(1, 3, readAccessB) // node id 6
dag.AddNodeBuildDependency(1, 3, readAccessA) // node id 7
dag.AddNodeBuildDependency(1, 3, commitAccessOp) // node id 8
dag.AddNodeBuildDependency(1, 4, writeAccessB) // node id 9
dag.AddNodeBuildDependency(1, 4, commitAccessOp) // node id 10

require.Equal(
t,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ require (
)

replace (
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.64
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.69
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.54
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+fscA+Pulbpo=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/sei-protocol/sei-cosmos v0.1.64 h1:smRXceJpavcGxG+KUoI5ztBdSvPKbebzTf06Bx0oAAM=
github.com/sei-protocol/sei-cosmos v0.1.64/go.mod h1:8C+uqD5ZyLjoZpQIKtQPpRynlD3fWgsrwFIt5NlKJIE=
github.com/sei-protocol/sei-cosmos v0.1.69 h1:jPFTf6vSg3DQ0FgKpQGw6ZEUuElKL2UJ3P8SBXbYnio=
github.com/sei-protocol/sei-cosmos v0.1.69/go.mod h1:Oaj7toqHCkwEEb+sDIWxtfTkPZxOpMXBXDMvIIqUjpw=
github.com/sei-protocol/sei-tendermint v0.1.54 h1:T8snsZ5schPUSMa2STd61a0Mt/yoz1H5MTn3HRTbzj8=
github.com/sei-protocol/sei-tendermint v0.1.54/go.mod h1:Olwbjyagrpoxj5DAUhHxMTWDVEfQ3FYdpypaJ3+6Hs8=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
Expand Down
6 changes: 3 additions & 3 deletions x/oracle/simulation/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func WeightedOperations(
}

// SimulateMsgAggregateExchangeRatePrevote generates a MsgAggregateExchangeRatePrevote with random values.
//nolint: funlen
// nolint: funlen
func SimulateMsgAggregateExchangeRatePrevote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation {
return func(
r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,
Expand Down Expand Up @@ -146,7 +146,7 @@ func SimulateMsgAggregateExchangeRatePrevote(ak types.AccountKeeper, bk types.Ba
}

// SimulateMsgAggregateExchangeRateVote generates a MsgAggregateExchangeRateVote with random values.
//nolint: funlen
// nolint: funlen
func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation {
return func(
r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,
Expand Down Expand Up @@ -213,7 +213,7 @@ func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankK
}

// SimulateMsgDelegateFeedConsent generates a MsgDelegateFeedConsent with random values.
//nolint: funlen
// nolint: funlen
func SimulateMsgDelegateFeedConsent(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation {
return func(
r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,
Expand Down