diff --git a/app/app.go b/app/app.go index 75c7f8fc64..75c58a24ce 100644 --- a/app/app.go +++ b/app/app.go @@ -1333,7 +1333,6 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ ProposerAddress: ctx.BlockHeader().ProposerAddress, }, } - beginBlockResp := app.BeginBlock(ctx, beginBlockReq) events = append(events, beginBlockResp.Events...) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 058e6723c2..22b9be0d37 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -10,7 +10,10 @@ import ( "sync/atomic" "time" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + "golang.org/x/sync/semaphore" "golang.org/x/time/rate" + "google.golang.org/grpc/connectivity" "github.com/cosmos/cosmos-sdk/types" typestx "github.com/cosmos/cosmos-sdk/types/tx" @@ -45,7 +48,11 @@ type LoadTestClient struct { func NewLoadTestClient(config Config) *LoadTestClient { var dialOptions []grpc.DialOption - // NOTE: Will likely need to whitelist node from elb rate limits - add ip to producer ip set + dialOptions = append(dialOptions, grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(20*1024*1024), + grpc.MaxCallSendMsgSize(20*1024*1024)), + ) + dialOptions = append(dialOptions, grpc.WithBlock()) if config.TLS { dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))) //nolint:gosec // Use insecure skip verify. } else { @@ -60,6 +67,22 @@ func NewLoadTestClient(config Config) *LoadTestClient { dialOptions...) TxClients[i] = typestx.NewServiceClient(grpcConn) GrpcConns[i] = grpcConn + // spin up goroutine for monitoring and reconnect purposes + go func() { + for { + state := grpcConn.GetState() + if state == connectivity.TransientFailure || state == connectivity.Shutdown { + fmt.Println("GRPC Connection lost, attempting to reconnect...") + for { + if grpcConn.WaitForStateChange(context.Background(), state) { + break + } + time.Sleep(10 * time.Second) + } + } + time.Sleep(10 * time.Second) + } + }() } return &LoadTestClient{ @@ -93,12 +116,9 @@ func (c *LoadTestClient) Close() { } } -func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) { +func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys []cryptotypes.PrivKey, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) { defer wg.Done() config := c.LoadTestConfig - accountIdentifier := fmt.Sprint(producerId) - accountKeyPath := c.SignerClient.GetTestAccountKeyPath(uint64(producerId)) - key := c.SignerClient.GetKey(accountIdentifier, "test", accountKeyPath) for { select { @@ -106,6 +126,7 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *syn fmt.Printf("Stopping producer %d\n", producerId) return default: + key := keys[atomic.LoadInt64(producedCount)%int64(len(keys))] msgs, _, _, gas, fee := c.generateMessage(config, key, config.MsgsPerTx) txBuilder := TestConfig.TxConfig.NewTxBuilder() _ = txBuilder.SetMsgs(msgs...) @@ -114,19 +135,27 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *syn types.NewCoin("usei", types.NewInt(fee)), }) // Use random seqno to get around txs that might already be seen in mempool - c.SignerClient.SignTx(c.ChainID, &txBuilder, key, uint64(rand.Intn(math.MaxInt))) txBytes, _ := TestConfig.TxConfig.TxEncoder()(txBuilder.GetTx()) - txQueue <- txBytes - atomic.AddInt64(producedCount, 1) + select { + case txQueue <- txBytes: + atomic.AddInt64(producedCount, 1) + case <-done: + // Exit if done signal is received while trying to send to txQueue + return + } } } } -func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) { +func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) - for { + maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls + sem := semaphore.NewWeighted(int64(maxConcurrent)) + for { select { case <-done: fmt.Printf("Stopping consumers\n") @@ -134,10 +163,25 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se case tx, ok := <-txQueue: if !ok { fmt.Printf("Stopping consumers\n") + return } - if rateLimiter.Allow() { - go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + + if err := sem.Acquire(ctx, 1); err != nil { + fmt.Printf("Failed to acquire semaphore: %v", err) + break } + wg.Add(1) + go func(tx []byte) { + localCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + defer wg.Done() + defer sem.Release(1) + + if err := rateLimiter.Wait(localCtx); err != nil { + return + } + SendTx(ctx, tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + }(tx) } } } diff --git a/loadtest/main.go b/loadtest/main.go index 0b6baed84a..99ab610b26 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -78,7 +78,7 @@ func run(config Config) { // starts loadtest workers. If config.Constant is true, then we don't gather loadtest results and let producer/consumer // workers continue running. If config.Constant is false, then we will gather load test results in a file func startLoadtestWorkers(config Config) { - fmt.Printf("Starting loadtest workers") + fmt.Printf("Starting loadtest workers\n") client := NewLoadTestClient(config) client.SetValidators() @@ -87,14 +87,14 @@ func startLoadtestWorkers(config Config) { txQueue := make(chan []byte, 10000) done := make(chan struct{}) - numProducers := 5 + numProducers := 1000 var wg sync.WaitGroup // Catch OS signals for graceful shutdown signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(1 * time.Second) start := time.Now() var producedCount int64 = 0 var sentCount int64 = 0 @@ -103,13 +103,19 @@ func startLoadtestWorkers(config Config) { var blockTimes []string var startHeight = getLastHeight(config.BlockchainEndpoint) fmt.Printf("Starting loadtest producers\n") + // preload all accounts + keys := client.SignerClient.GetTestAccountsKeys(int(config.TargetTps)) for i := 0; i < numProducers; i++ { wg.Add(1) - go client.BuildTxs(txQueue, i, &wg, done, &producedCount) + go client.BuildTxs(txQueue, i, keys, &wg, done, &producedCount) + } + // Give producers some time to populate queue + if config.TargetTps > 1000 { + time.Sleep(5 * time.Second) } fmt.Printf("Starting loadtest consumers\n") - go client.SendTxs(txQueue, done, &sentCount, int(config.TargetTps)) + go client.SendTxs(txQueue, done, &sentCount, int(config.TargetTps), &wg) // Statistics reporting goroutine go func() { for { @@ -143,6 +149,7 @@ func startLoadtestWorkers(config Config) { <-signals fmt.Println("SIGINT received, shutting down...") close(done) + wg.Wait() close(txQueue) } diff --git a/loadtest/sign.go b/loadtest/sign.go index 8e9680ab4f..ca839c1ed0 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io" + "math" "os" "path/filepath" "sync" @@ -62,9 +63,33 @@ func NewSignerClient(nodeURI string) *SignerClient { } } -func (sc *SignerClient) GetTestAccountKeyPath(accountID uint64) string { +func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivKey { userHomeDir, _ := os.UserHomeDir() - return filepath.Join(userHomeDir, "test_accounts", fmt.Sprintf("ta%d.json", accountID)) + files, _ := os.ReadDir(filepath.Join(userHomeDir, "test_accounts")) + + var testAccountsKeys = make([]cryptotypes.PrivKey, int(math.Min(float64(len(files)), float64(maxAccounts)))) + var wg sync.WaitGroup + keysChan := make(chan cryptotypes.PrivKey, maxAccounts) + fmt.Printf("Loading accounts\n") + for i, file := range files { + if i >= maxAccounts { + break + } + wg.Add(1) + go func(i int, fileName string) { + defer wg.Done() + key := sc.GetKey(fmt.Sprint(i), "test", filepath.Join(userHomeDir, "test_accounts", fileName)) + keysChan <- key + }(i, file.Name()) + } + wg.Wait() + close(keysChan) + // Collect keys from the channel + for key := range keysChan { + testAccountsKeys = append(testAccountsKeys, key) + } + + return testAccountsKeys } func (sc *SignerClient) GetAdminAccountKeyPath() string { diff --git a/loadtest/tx.go b/loadtest/tx.go index 8c7361b78f..76a90390f4 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -2,64 +2,33 @@ package main import ( "context" - "fmt" "sync/atomic" - "time" - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" typestx "github.com/cosmos/cosmos-sdk/types/tx" ) func SendTx( + ctx context.Context, txBytes []byte, mode typestx.BroadcastMode, failureExpected bool, loadtestClient LoadTestClient, sentCount *int64, ) { - grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( - context.Background(), + + grpcRes, _ := loadtestClient.GetTxClient().BroadcastTx( + ctx, &typestx.BroadcastTxRequest{ Mode: mode, TxBytes: txBytes, }, ) - if err != nil { - if failureExpected { - fmt.Printf("Error: %s\n", err) - } else { - panic(err) - } - - if grpcRes == nil || grpcRes.TxResponse == nil { - return - } - if grpcRes.TxResponse.Code == 0 { - atomic.AddInt64(sentCount, 1) - } - } - for grpcRes.TxResponse.Code == sdkerrors.ErrMempoolIsFull.ABCICode() { - // retry after a second until either succeed or fail for some other reason - fmt.Printf("Mempool full\n") - time.Sleep(1 * time.Second) - grpcRes, err = loadtestClient.GetTxClient().BroadcastTx( - context.Background(), - &typestx.BroadcastTxRequest{ - Mode: mode, - TxBytes: txBytes, - }, - ) - if err != nil { - if failureExpected { - } else { - panic(err) - } - } - } - if grpcRes.TxResponse.Code != 0 { - fmt.Printf("Error: %d, %s\n", grpcRes.TxResponse.Code, grpcRes.TxResponse.RawLog) - } else { + if failureExpected { + atomic.AddInt64(sentCount, 1) + return + } else if grpcRes != nil && grpcRes.TxResponse.Code == 0 { atomic.AddInt64(sentCount, 1) + return } }