From 0eda351a3e4781ddbc544a999f2572c32ab2d732 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 14:15:22 -0800 Subject: [PATCH 01/46] debug --- loadtest/loadtest_client.go | 50 ++++++++++++++++++++++++------------- loadtest/main.go | 4 ++- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 058e6723c2..cee48ef746 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -93,7 +93,7 @@ func (c *LoadTestClient) Close() { } } -func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) { +func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64, rateLimiter *rate.Limiter) { defer wg.Done() config := c.LoadTestConfig accountIdentifier := fmt.Sprint(producerId) @@ -106,38 +106,52 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *syn fmt.Printf("Stopping producer %d\n", producerId) return default: - msgs, _, _, gas, fee := c.generateMessage(config, key, config.MsgsPerTx) - txBuilder := TestConfig.TxConfig.NewTxBuilder() - _ = txBuilder.SetMsgs(msgs...) - txBuilder.SetGasLimit(gas) - txBuilder.SetFeeAmount([]types.Coin{ - types.NewCoin("usei", types.NewInt(fee)), - }) - // Use random seqno to get around txs that might already be seen in mempool - - c.SignerClient.SignTx(c.ChainID, &txBuilder, key, uint64(rand.Intn(math.MaxInt))) - txBytes, _ := TestConfig.TxConfig.TxEncoder()(txBuilder.GetTx()) - txQueue <- txBytes - atomic.AddInt64(producedCount, 1) + if rateLimiter.Allow() { + msgs, _, _, gas, fee := c.generateMessage(config, key, config.MsgsPerTx) + txBuilder := TestConfig.TxConfig.NewTxBuilder() + _ = txBuilder.SetMsgs(msgs...) + txBuilder.SetGasLimit(gas) + txBuilder.SetFeeAmount([]types.Coin{ + types.NewCoin("usei", types.NewInt(fee)), + }) + // Use random seqno to get around txs that might already be seen in mempool + + c.SignerClient.SignTx(c.ChainID, &txBuilder, key, uint64(rand.Intn(math.MaxInt))) + txBytes, _ := TestConfig.TxConfig.TxEncoder()(txBuilder.GetTx()) + txQueue <- txBytes + atomic.AddInt64(producedCount, 1) + } } } } func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) { - rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) + //rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) + wg := sync.WaitGroup{} + for { select { case <-done: fmt.Printf("Stopping consumers\n") + wg.Wait() return case tx, ok := <-txQueue: if !ok { fmt.Printf("Stopping consumers\n") + wg.Wait() + return } - if rateLimiter.Allow() { - go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) - } + wg.Add(1) + go func(tx []byte) { + defer wg.Done() + // Wait blocks until the limiter allows another event. + //if err := rateLimiter.Wait(context.Background()); err == nil { + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + //} else { + // fmt.Printf("Error waiting for rate limiter: %v\n", err) + //} + }(tx) } } } diff --git a/loadtest/main.go b/loadtest/main.go index 0b6baed84a..68260b600a 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -4,6 +4,7 @@ import ( "encoding/json" "flag" "fmt" + "golang.org/x/time/rate" "io" "math/rand" "net/http" @@ -103,9 +104,10 @@ func startLoadtestWorkers(config Config) { var blockTimes []string var startHeight = getLastHeight(config.BlockchainEndpoint) fmt.Printf("Starting loadtest producers\n") + rateLimiter := rate.NewLimiter(rate.Limit(config.TargetTps), int(config.TargetTps)) for i := 0; i < numProducers; i++ { wg.Add(1) - go client.BuildTxs(txQueue, i, &wg, done, &producedCount) + go client.BuildTxs(txQueue, i, &wg, done, &producedCount, rateLimiter) } fmt.Printf("Starting loadtest consumers\n") From 4182ec62c6e8b244e228584ac08896e0bfa2d53a Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 14:48:09 -0800 Subject: [PATCH 02/46] debug --- loadtest/loadtest_client.go | 88 ++++++++++++++++++++++++++----------- loadtest/main.go | 6 +-- loadtest/tx.go | 4 +- 3 files changed, 66 insertions(+), 32 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index cee48ef746..18982f5291 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "golang.org/x/sync/semaphore" "math" "math/rand" "strings" @@ -93,7 +94,7 @@ func (c *LoadTestClient) Close() { } } -func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64, rateLimiter *rate.Limiter) { +func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) { defer wg.Done() config := c.LoadTestConfig accountIdentifier := fmt.Sprint(producerId) @@ -106,31 +107,60 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *syn fmt.Printf("Stopping producer %d\n", producerId) return default: - if rateLimiter.Allow() { - msgs, _, _, gas, fee := c.generateMessage(config, key, config.MsgsPerTx) - txBuilder := TestConfig.TxConfig.NewTxBuilder() - _ = txBuilder.SetMsgs(msgs...) - txBuilder.SetGasLimit(gas) - txBuilder.SetFeeAmount([]types.Coin{ - types.NewCoin("usei", types.NewInt(fee)), - }) - // Use random seqno to get around txs that might already be seen in mempool - - c.SignerClient.SignTx(c.ChainID, &txBuilder, key, uint64(rand.Intn(math.MaxInt))) - txBytes, _ := TestConfig.TxConfig.TxEncoder()(txBuilder.GetTx()) - txQueue <- txBytes - atomic.AddInt64(producedCount, 1) - } + msgs, _, _, gas, fee := c.generateMessage(config, key, config.MsgsPerTx) + txBuilder := TestConfig.TxConfig.NewTxBuilder() + _ = txBuilder.SetMsgs(msgs...) + txBuilder.SetGasLimit(gas) + txBuilder.SetFeeAmount([]types.Coin{ + types.NewCoin("usei", types.NewInt(fee)), + }) + // Use random seqno to get around txs that might already be seen in mempool + + c.SignerClient.SignTx(c.ChainID, &txBuilder, key, uint64(rand.Intn(math.MaxInt))) + txBytes, _ := TestConfig.TxConfig.TxEncoder()(txBuilder.GetTx()) + txQueue <- txBytes + atomic.AddInt64(producedCount, 1) } } } -func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) { - //rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) - wg := sync.WaitGroup{} +//func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) { +// rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) +// wg := sync.WaitGroup{} +// +// for { +// +// select { +// case <-done: +// fmt.Printf("Stopping consumers\n") +// wg.Wait() +// return +// case tx, ok := <-txQueue: +// if !ok { +// fmt.Printf("Stopping consumers\n") +// wg.Wait() +// return +// } +// wg.Add(1) +// go func(tx []byte) { +// defer wg.Done() +// // Wait blocks until the limiter allows another event. +// if err := rateLimiter.Wait(context.Background()); err == nil { +// SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) +// } else { +// fmt.Printf("Error waiting for rate limiter: %v\n", err) +// } +// }(tx) +// } +// } +//} + +func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { + rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) + maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls + sem := semaphore.NewWeighted(int64(maxConcurrent)) for { - select { case <-done: fmt.Printf("Stopping consumers\n") @@ -142,15 +172,23 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se wg.Wait() return } + + if err := sem.Acquire(context.Background(), 1); err != nil { + fmt.Printf("Failed to acquire semaphore: %v", err) + break + } + wg.Add(1) go func(tx []byte) { defer wg.Done() - // Wait blocks until the limiter allows another event. - //if err := rateLimiter.Wait(context.Background()); err == nil { + defer sem.Release(1) + + if err := rateLimiter.Wait(context.Background()); err != nil { + fmt.Printf("Error waiting for rate limiter: %v\n", err) + return + } + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) - //} else { - // fmt.Printf("Error waiting for rate limiter: %v\n", err) - //} }(tx) } } diff --git a/loadtest/main.go b/loadtest/main.go index 68260b600a..e1c349a035 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -4,7 +4,6 @@ import ( "encoding/json" "flag" "fmt" - "golang.org/x/time/rate" "io" "math/rand" "net/http" @@ -104,14 +103,13 @@ func startLoadtestWorkers(config Config) { var blockTimes []string var startHeight = getLastHeight(config.BlockchainEndpoint) fmt.Printf("Starting loadtest producers\n") - rateLimiter := rate.NewLimiter(rate.Limit(config.TargetTps), int(config.TargetTps)) for i := 0; i < numProducers; i++ { wg.Add(1) - go client.BuildTxs(txQueue, i, &wg, done, &producedCount, rateLimiter) + go client.BuildTxs(txQueue, i, &wg, done, &producedCount) } fmt.Printf("Starting loadtest consumers\n") - go client.SendTxs(txQueue, done, &sentCount, int(config.TargetTps)) + go client.SendTxs(txQueue, done, &sentCount, int(config.TargetTps), &wg) // Statistics reporting goroutine go func() { for { diff --git a/loadtest/tx.go b/loadtest/tx.go index 8c7361b78f..d3732e1262 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -57,9 +57,7 @@ func SendTx( } } } - if grpcRes.TxResponse.Code != 0 { - fmt.Printf("Error: %d, %s\n", grpcRes.TxResponse.Code, grpcRes.TxResponse.RawLog) - } else { + if grpcRes.TxResponse.Code == 0 { atomic.AddInt64(sentCount, 1) } } From 6cb5e381faf86c4e863534d83b10ba3fb01d0e41 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 15:07:25 -0800 Subject: [PATCH 03/46] debug --- loadtest/loadtest_client.go | 40 +++---------------------------------- 1 file changed, 3 insertions(+), 37 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 18982f5291..1b4f8cf122 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -124,37 +124,6 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *syn } } -//func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) { -// rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) -// wg := sync.WaitGroup{} -// -// for { -// -// select { -// case <-done: -// fmt.Printf("Stopping consumers\n") -// wg.Wait() -// return -// case tx, ok := <-txQueue: -// if !ok { -// fmt.Printf("Stopping consumers\n") -// wg.Wait() -// return -// } -// wg.Add(1) -// go func(tx []byte) { -// defer wg.Done() -// // Wait blocks until the limiter allows another event. -// if err := rateLimiter.Wait(context.Background()); err == nil { -// SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) -// } else { -// fmt.Printf("Error waiting for rate limiter: %v\n", err) -// } -// }(tx) -// } -// } -//} - func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls @@ -174,7 +143,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se } if err := sem.Acquire(context.Background(), 1); err != nil { - fmt.Printf("Failed to acquire semaphore: %v", err) + fmt.Printf("Failed to acquire semaphore: %s", err) break } @@ -183,12 +152,9 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se defer wg.Done() defer sem.Release(1) - if err := rateLimiter.Wait(context.Background()); err != nil { - fmt.Printf("Error waiting for rate limiter: %v\n", err) - return + if err := rateLimiter.Wait(context.Background()); err == nil { + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) } - - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) }(tx) } } From 4b6333b81365849fde1f02ad31938925aec46cad Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 15:15:22 -0800 Subject: [PATCH 04/46] set default broadcast to sync --- loadtest/loadtest_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 1b4f8cf122..f390fd2578 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -153,7 +153,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se defer sem.Release(1) if err := rateLimiter.Wait(context.Background()); err == nil { - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) } }(tx) } From 767442f71e5fe5fa058637d64a0f54b99afbc621 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 16:24:21 -0800 Subject: [PATCH 05/46] debug --- loadtest/loadtest_client.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index f390fd2578..8c3639dacf 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -129,6 +129,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls sem := semaphore.NewWeighted(int64(maxConcurrent)) + i := 0 for { select { case <-done: @@ -148,14 +149,22 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se } wg.Add(1) - go func(tx []byte) { - defer wg.Done() - defer sem.Release(1) - - if err := rateLimiter.Wait(context.Background()); err == nil { - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) - } - }(tx) + i += 1 + if i == maxConcurrent { + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + i = 0 + + } else { + go func(tx []byte) { + defer wg.Done() + defer sem.Release(1) + + if err := rateLimiter.Wait(context.Background()); err == nil { + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) + } + }(tx) + } + } } } From 636c510c55613ea11dedd272c6a9b32772334f4e Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 16:53:06 -0800 Subject: [PATCH 06/46] debug --- loadtest/loadtest_client.go | 16 ++++++++-------- loadtest/main.go | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 8c3639dacf..2e6be52f32 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -155,14 +155,14 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se i = 0 } else { - go func(tx []byte) { - defer wg.Done() - defer sem.Release(1) - - if err := rateLimiter.Wait(context.Background()); err == nil { - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) - } - }(tx) + //go func(tx []byte) { + // defer wg.Done() + // defer sem.Release(1) + + if err := rateLimiter.Wait(context.Background()); err == nil { + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_ASYNC, false, *c, sentCount) + } + //}(tx) } } diff --git a/loadtest/main.go b/loadtest/main.go index e1c349a035..f348fa903e 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -94,7 +94,7 @@ func startLoadtestWorkers(config Config) { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(1 * time.Second) start := time.Now() var producedCount int64 = 0 var sentCount int64 = 0 From 0e247b83d1d50a289db9d3200e737a6e5120e8fd Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 16:54:35 -0800 Subject: [PATCH 07/46] debug --- loadtest/loadtest_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 2e6be52f32..25f035ea07 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -143,14 +143,14 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se return } - if err := sem.Acquire(context.Background(), 1); err != nil { - fmt.Printf("Failed to acquire semaphore: %s", err) - break - } + //if err := sem.Acquire(context.Background(), 1); err != nil { + // fmt.Printf("Failed to acquire semaphore: %s", err) + // break + //} wg.Add(1) i += 1 - if i == maxConcurrent { + if i >= maxConcurrent { SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) i = 0 From e3325d06d703917a1e816e81ee9ae5931f15b5fb Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 16:55:04 -0800 Subject: [PATCH 08/46] debug --- loadtest/loadtest_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 25f035ea07..8330f43744 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "golang.org/x/sync/semaphore" "math" "math/rand" "strings" @@ -127,7 +126,7 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *syn func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls - sem := semaphore.NewWeighted(int64(maxConcurrent)) + //sem := semaphore.NewWeighted(int64(maxConcurrent)) i := 0 for { From 60fbed3f01acbcbd20a4497f4bf47fd00162f6d4 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 16:56:53 -0800 Subject: [PATCH 09/46] debug --- loadtest/loadtest_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 8330f43744..ccdd8db8a8 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -159,7 +159,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se // defer sem.Release(1) if err := rateLimiter.Wait(context.Background()); err == nil { - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_ASYNC, false, *c, sentCount) + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) } //}(tx) } From f8ca02368c9b7405db9880273d88bdda69b22178 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 17:07:01 -0800 Subject: [PATCH 10/46] debug --- loadtest/loadtest_client.go | 42 ++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index ccdd8db8a8..eeba963075 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -128,8 +128,15 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls //sem := semaphore.NewWeighted(int64(maxConcurrent)) - i := 0 + //i := 0 + lastHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint) for { + newHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint) + for newHeight == lastHeight { + time.Sleep(10 * time.Millisecond) + newHeight = getLastHeight(c.LoadTestConfig.BlockchainEndpoint) + } + select { case <-done: fmt.Printf("Stopping consumers\n") @@ -148,21 +155,26 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se //} wg.Add(1) - i += 1 - if i >= maxConcurrent { - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) - i = 0 - - } else { - //go func(tx []byte) { - // defer wg.Done() - // defer sem.Release(1) - - if err := rateLimiter.Wait(context.Background()); err == nil { - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) - } - //}(tx) + for i := 0; i < maxConcurrent; i++ { + + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) } + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + lastHeight = newHeight + //if i >= maxConcurrent { + // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + // i = 0 + // + //} else { + //go func(tx []byte) { + // defer wg.Done() + // defer sem.Release(1) + + //if err := rateLimiter.Wait(context.Background()); err == nil { + // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) + //} + //}(tx) + //} } } From 1111bf9f916132057b07360d4844c3490b201cec Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 17:07:26 -0800 Subject: [PATCH 11/46] debug --- loadtest/loadtest_client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index eeba963075..acb17527d1 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -10,8 +10,6 @@ import ( "sync/atomic" "time" - "golang.org/x/time/rate" - "github.com/cosmos/cosmos-sdk/types" typestx "github.com/cosmos/cosmos-sdk/types/tx" stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" @@ -124,7 +122,7 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *syn } func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { - rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) + //rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls //sem := semaphore.NewWeighted(int64(maxConcurrent)) From b729c83c1008db8b8636e295f9472fb30e8b8d4b Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 17:07:57 -0800 Subject: [PATCH 12/46] debug --- loadtest/loadtest_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index acb17527d1..d18dda7be5 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -155,7 +155,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se wg.Add(1) for i := 0; i < maxConcurrent; i++ { - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) + go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) } SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) lastHeight = newHeight From 11be8371c6a829a308a16414cb30d1fb0e687463 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 17:08:44 -0800 Subject: [PATCH 13/46] debug --- loadtest/loadtest_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index d18dda7be5..edf3b39250 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -157,7 +157,6 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) } - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) lastHeight = newHeight //if i >= maxConcurrent { // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) From c2f496bb282fec920cee4781f018287f323bf52c Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 17:11:21 -0800 Subject: [PATCH 14/46] debug --- loadtest/loadtest_client.go | 1 + loadtest/tx.go | 1 + 2 files changed, 2 insertions(+) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index edf3b39250..20c985514f 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -155,6 +155,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se wg.Add(1) for i := 0; i < maxConcurrent; i++ { + fmt.Printf("%s\n", i) go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) } lastHeight = newHeight diff --git a/loadtest/tx.go b/loadtest/tx.go index d3732e1262..60bc5d1215 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -17,6 +17,7 @@ func SendTx( loadtestClient LoadTestClient, sentCount *int64, ) { + fmt.Printf("PSUDEBUG - sending txs") grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( context.Background(), &typestx.BroadcastTxRequest{ From ef457258052ecf00fff87aae681c5770ed0a4954 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 17:13:19 -0800 Subject: [PATCH 15/46] debug --- loadtest/loadtest_client.go | 1 - loadtest/tx.go | 44 ++++++++++++++++++------------------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 20c985514f..edf3b39250 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -155,7 +155,6 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se wg.Add(1) for i := 0; i < maxConcurrent; i++ { - fmt.Printf("%s\n", i) go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) } lastHeight = newHeight diff --git a/loadtest/tx.go b/loadtest/tx.go index 60bc5d1215..1691182a83 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -3,11 +3,8 @@ package main import ( "context" "fmt" - "sync/atomic" - "time" - - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" typestx "github.com/cosmos/cosmos-sdk/types/tx" + "sync/atomic" ) func SendTx( @@ -17,7 +14,6 @@ func SendTx( loadtestClient LoadTestClient, sentCount *int64, ) { - fmt.Printf("PSUDEBUG - sending txs") grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( context.Background(), &typestx.BroadcastTxRequest{ @@ -40,25 +36,27 @@ func SendTx( } } - for grpcRes.TxResponse.Code == sdkerrors.ErrMempoolIsFull.ABCICode() { - // retry after a second until either succeed or fail for some other reason - fmt.Printf("Mempool full\n") - time.Sleep(1 * time.Second) - grpcRes, err = loadtestClient.GetTxClient().BroadcastTx( - context.Background(), - &typestx.BroadcastTxRequest{ - Mode: mode, - TxBytes: txBytes, - }, - ) - if err != nil { - if failureExpected { - } else { - panic(err) - } - } - } + //for grpcRes.TxResponse.Code == sdkerrors.ErrMempoolIsFull.ABCICode() { + // // retry after a second until either succeed or fail for some other reason + // fmt.Printf("Mempool full\n") + // time.Sleep(1 * time.Second) + // grpcRes, err = loadtestClient.GetTxClient().BroadcastTx( + // context.Background(), + // &typestx.BroadcastTxRequest{ + // Mode: mode, + // TxBytes: txBytes, + // }, + // ) + // if err != nil { + // if failureExpected { + // } else { + // panic(err) + // } + // } + //} if grpcRes.TxResponse.Code == 0 { atomic.AddInt64(sentCount, 1) + } else { + fmt.Printf("PSUDEBUTG - failed: %s\n", grpcRes.TxResponse) } } From 23cd2af726574f02e3611612afcfe8195ac89289 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Wed, 10 Jan 2024 17:14:19 -0800 Subject: [PATCH 16/46] debug --- loadtest/loadtest_client.go | 64 ++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index edf3b39250..ddb0b1c638 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -134,44 +134,42 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se time.Sleep(10 * time.Millisecond) newHeight = getLastHeight(c.LoadTestConfig.BlockchainEndpoint) } - - select { - case <-done: - fmt.Printf("Stopping consumers\n") - wg.Wait() - return - case tx, ok := <-txQueue: - if !ok { + for i := 0; i < maxConcurrent; i++ { + select { + case <-done: fmt.Printf("Stopping consumers\n") wg.Wait() return - } - - //if err := sem.Acquire(context.Background(), 1); err != nil { - // fmt.Printf("Failed to acquire semaphore: %s", err) - // break - //} - - wg.Add(1) - for i := 0; i < maxConcurrent; i++ { - + case tx, ok := <-txQueue: + if !ok { + fmt.Printf("Stopping consumers\n") + wg.Wait() + return + } + + //if err := sem.Acquire(context.Background(), 1); err != nil { + // fmt.Printf("Failed to acquire semaphore: %s", err) + // break + //} + + wg.Add(1) go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) + lastHeight = newHeight + //if i >= maxConcurrent { + // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + // i = 0 + // + //} else { + //go func(tx []byte) { + // defer wg.Done() + // defer sem.Release(1) + + //if err := rateLimiter.Wait(context.Background()); err == nil { + // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) + //} + //}(tx) + //} } - lastHeight = newHeight - //if i >= maxConcurrent { - // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) - // i = 0 - // - //} else { - //go func(tx []byte) { - // defer wg.Done() - // defer sem.Release(1) - - //if err := rateLimiter.Wait(context.Background()); err == nil { - // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) - //} - //}(tx) - //} } } From e686b597b33874dd076707278c65a8781acd3c23 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 14:17:04 -0800 Subject: [PATCH 17/46] debug --- loadtest/loadtest_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index ddb0b1c638..56aaeac97f 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -154,7 +154,6 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se wg.Add(1) go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) - lastHeight = newHeight //if i >= maxConcurrent { // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) // i = 0 @@ -172,6 +171,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se } } + lastHeight = newHeight } } From cec4d62eff5ee8666f4c1cd555556e1f8b46c055 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 14:27:51 -0800 Subject: [PATCH 18/46] debug --- loadtest/loadtest_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 56aaeac97f..20eebfabfc 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -171,6 +171,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se } } + time.Sleep(1 * time.Second) lastHeight = newHeight } } From cf769a948bd1a8812f0aea046b4f7449aacf25a4 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 14:54:23 -0800 Subject: [PATCH 19/46] debug --- app/app.go | 18 +++++++++++++++++- loadtest/loadtest_client.go | 1 - 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/app/app.go b/app/app.go index 75c7f8fc64..b65b1f85c6 100644 --- a/app/app.go +++ b/app/app.go @@ -1308,6 +1308,10 @@ func (app *App) BuildDependenciesAndRunTxs(ctx sdk.Context, txs [][]byte) ([]*ab } func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequest, lastCommit abci.CommitInfo) ([]abci.Event, []*abci.ExecTxResult, abci.ResponseEndBlock, error) { + startTime := time.Now() + defer func() { + fmt.Printf("PSUDEBUG - Process block time ms: %d\n", time.Now().Sub(startTime).Milliseconds()) + }() goCtx := app.decorateContextWithDexMemState(ctx.Context()) ctx = ctx.WithContext(goCtx) @@ -1333,27 +1337,39 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ ProposerAddress: ctx.BlockHeader().ProposerAddress, }, } - + beginBlockStart := time.Now() beginBlockResp := app.BeginBlock(ctx, beginBlockReq) + fmt.Printf("PSUDEBUG - Begin block time ms: %d\n", time.Now().Sub(beginBlockStart).Milliseconds()) + events = append(events, beginBlockResp.Events...) txResults := make([]*abci.ExecTxResult, len(txs)) + partitionPrioritizedTxsStart := time.Now() prioritizedTxs, otherTxs, prioritizedIndices, otherIndices := app.PartitionPrioritizedTxs(ctx, txs) + fmt.Printf("PSUDEBUG - Partition Prioritized Txs ms: %d\n", time.Now().Sub(partitionPrioritizedTxsStart).Milliseconds()) // run the prioritized txs + buildDependenciesAndRunTxsStart := time.Now() prioritizedResults, ctx := app.BuildDependenciesAndRunTxs(ctx, prioritizedTxs) + fmt.Printf("PSUDEBUG - BuildDependenciesAndRunTxs1 ms: %d\n", time.Now().Sub(buildDependenciesAndRunTxsStart).Milliseconds()) for relativePrioritizedIndex, originalIndex := range prioritizedIndices { txResults[originalIndex] = prioritizedResults[relativePrioritizedIndex] } // Finalize all Bank Module Transfers here so that events are included for prioritiezd txs + writeDeferredBalanceStart := time.Now() deferredWriteEvents := app.BankKeeper.WriteDeferredBalances(ctx) + fmt.Printf("PSUDEBUG - writeDeferredBalanceStart ms: %d\n", time.Now().Sub(writeDeferredBalanceStart).Milliseconds()) + events = append(events, deferredWriteEvents...) midBlockEvents := app.MidBlock(ctx, req.GetHeight()) events = append(events, midBlockEvents...) + buildDependenciesAndRunTxsStart2 := time.Now() otherResults, ctx := app.BuildDependenciesAndRunTxs(ctx, otherTxs) + fmt.Printf("PSUDEBUG - buildDependenciesAndRunTxsStart2 ms: %d\n", time.Now().Sub(buildDependenciesAndRunTxsStart2).Milliseconds()) + for relativeOtherIndex, originalIndex := range otherIndices { txResults[originalIndex] = otherResults[relativeOtherIndex] } diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 20eebfabfc..56aaeac97f 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -171,7 +171,6 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se } } - time.Sleep(1 * time.Second) lastHeight = newHeight } } From 1c3cda02ddb243328660f19931840a764ee014a9 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 15:05:04 -0800 Subject: [PATCH 20/46] debug --- loadtest/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/main.go b/loadtest/main.go index f348fa903e..899194f72c 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -87,7 +87,7 @@ func startLoadtestWorkers(config Config) { txQueue := make(chan []byte, 10000) done := make(chan struct{}) - numProducers := 5 + numProducers := 1000 var wg sync.WaitGroup // Catch OS signals for graceful shutdown From 4b65d12ce71b6a7c73050ff83110c0a4a8b54c91 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 17:45:36 -0800 Subject: [PATCH 21/46] grpc --- loadtest/loadtest_client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 56aaeac97f..dc1b848f8f 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -43,6 +43,11 @@ type LoadTestClient struct { func NewLoadTestClient(config Config) *LoadTestClient { var dialOptions []grpc.DialOption + dialOptions = append(dialOptions, grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(20*1024*1024), + grpc.MaxCallSendMsgSize(20*1024*1024)), + ) + dialOptions = append(dialOptions, grpc.WithBlock()) // NOTE: Will likely need to whitelist node from elb rate limits - add ip to producer ip set if config.TLS { dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))) //nolint:gosec // Use insecure skip verify. From e381e09500b54e681c69992d10696c26c50df6b9 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 19:56:51 -0800 Subject: [PATCH 22/46] debug --- app/app.go | 14 -------------- loadtest/loadtest_client.go | 7 +++---- loadtest/main.go | 6 ++++-- loadtest/sign.go | 9 +++++++-- 4 files changed, 14 insertions(+), 22 deletions(-) diff --git a/app/app.go b/app/app.go index b65b1f85c6..ef9e80ac15 100644 --- a/app/app.go +++ b/app/app.go @@ -1308,10 +1308,6 @@ func (app *App) BuildDependenciesAndRunTxs(ctx sdk.Context, txs [][]byte) ([]*ab } func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequest, lastCommit abci.CommitInfo) ([]abci.Event, []*abci.ExecTxResult, abci.ResponseEndBlock, error) { - startTime := time.Now() - defer func() { - fmt.Printf("PSUDEBUG - Process block time ms: %d\n", time.Now().Sub(startTime).Milliseconds()) - }() goCtx := app.decorateContextWithDexMemState(ctx.Context()) ctx = ctx.WithContext(goCtx) @@ -1337,38 +1333,28 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ ProposerAddress: ctx.BlockHeader().ProposerAddress, }, } - beginBlockStart := time.Now() beginBlockResp := app.BeginBlock(ctx, beginBlockReq) - fmt.Printf("PSUDEBUG - Begin block time ms: %d\n", time.Now().Sub(beginBlockStart).Milliseconds()) events = append(events, beginBlockResp.Events...) txResults := make([]*abci.ExecTxResult, len(txs)) - partitionPrioritizedTxsStart := time.Now() prioritizedTxs, otherTxs, prioritizedIndices, otherIndices := app.PartitionPrioritizedTxs(ctx, txs) - fmt.Printf("PSUDEBUG - Partition Prioritized Txs ms: %d\n", time.Now().Sub(partitionPrioritizedTxsStart).Milliseconds()) // run the prioritized txs - buildDependenciesAndRunTxsStart := time.Now() prioritizedResults, ctx := app.BuildDependenciesAndRunTxs(ctx, prioritizedTxs) - fmt.Printf("PSUDEBUG - BuildDependenciesAndRunTxs1 ms: %d\n", time.Now().Sub(buildDependenciesAndRunTxsStart).Milliseconds()) for relativePrioritizedIndex, originalIndex := range prioritizedIndices { txResults[originalIndex] = prioritizedResults[relativePrioritizedIndex] } // Finalize all Bank Module Transfers here so that events are included for prioritiezd txs - writeDeferredBalanceStart := time.Now() deferredWriteEvents := app.BankKeeper.WriteDeferredBalances(ctx) - fmt.Printf("PSUDEBUG - writeDeferredBalanceStart ms: %d\n", time.Now().Sub(writeDeferredBalanceStart).Milliseconds()) events = append(events, deferredWriteEvents...) midBlockEvents := app.MidBlock(ctx, req.GetHeight()) events = append(events, midBlockEvents...) - buildDependenciesAndRunTxsStart2 := time.Now() otherResults, ctx := app.BuildDependenciesAndRunTxs(ctx, otherTxs) - fmt.Printf("PSUDEBUG - buildDependenciesAndRunTxsStart2 ms: %d\n", time.Now().Sub(buildDependenciesAndRunTxsStart2).Milliseconds()) for relativeOtherIndex, originalIndex := range otherIndices { txResults[originalIndex] = otherResults[relativeOtherIndex] diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index dc1b848f8f..75ca42b35b 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" "math" "math/rand" "strings" @@ -96,12 +97,9 @@ func (c *LoadTestClient) Close() { } } -func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) { +func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys []cryptotypes.PrivKey, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) { defer wg.Done() config := c.LoadTestConfig - accountIdentifier := fmt.Sprint(producerId) - accountKeyPath := c.SignerClient.GetTestAccountKeyPath(uint64(producerId)) - key := c.SignerClient.GetKey(accountIdentifier, "test", accountKeyPath) for { select { @@ -109,6 +107,7 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *syn fmt.Printf("Stopping producer %d\n", producerId) return default: + key := keys[atomic.LoadInt64(producedCount)%int64(len(keys))] msgs, _, _, gas, fee := c.generateMessage(config, key, config.MsgsPerTx) txBuilder := TestConfig.TxConfig.NewTxBuilder() _ = txBuilder.SetMsgs(msgs...) diff --git a/loadtest/main.go b/loadtest/main.go index 899194f72c..5c2760ed11 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -87,7 +87,7 @@ func startLoadtestWorkers(config Config) { txQueue := make(chan []byte, 10000) done := make(chan struct{}) - numProducers := 1000 + numProducers := 5 var wg sync.WaitGroup // Catch OS signals for graceful shutdown @@ -103,9 +103,11 @@ func startLoadtestWorkers(config Config) { var blockTimes []string var startHeight = getLastHeight(config.BlockchainEndpoint) fmt.Printf("Starting loadtest producers\n") + // preload all accounts + keys := client.SignerClient.GetAllTestAccountsKeys() for i := 0; i < numProducers; i++ { wg.Add(1) - go client.BuildTxs(txQueue, i, &wg, done, &producedCount) + go client.BuildTxs(txQueue, i, keys, &wg, done, &producedCount) } fmt.Printf("Starting loadtest consumers\n") diff --git a/loadtest/sign.go b/loadtest/sign.go index 8e9680ab4f..038cb36bc8 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -62,9 +62,14 @@ func NewSignerClient(nodeURI string) *SignerClient { } } -func (sc *SignerClient) GetTestAccountKeyPath(accountID uint64) string { +func (sc *SignerClient) GetAllTestAccountsKeys() []cryptotypes.PrivKey { userHomeDir, _ := os.UserHomeDir() - return filepath.Join(userHomeDir, "test_accounts", fmt.Sprintf("ta%d.json", accountID)) + files, _ := os.ReadDir(filepath.Join(userHomeDir, "test_accounts")) + var testAccountsKeys []cryptotypes.PrivKey + for i, file := range files { + testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", file.Name())) + } + return testAccountsKeys } func (sc *SignerClient) GetAdminAccountKeyPath() string { From 534823767ac4f0c1cfe5698d6b746bdae622f69c Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 19:57:27 -0800 Subject: [PATCH 23/46] debug --- loadtest/loadtest_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 75ca42b35b..4cd0245c69 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -157,7 +157,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se //} wg.Add(1) - go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) + go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) //if i >= maxConcurrent { // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) // i = 0 From a2c57f53ac679b46c9ea331c8eb18000c4333610 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 20:00:11 -0800 Subject: [PATCH 24/46] debug --- loadtest/main.go | 4 ++-- loadtest/sign.go | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/loadtest/main.go b/loadtest/main.go index 5c2760ed11..8e9f81b55a 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -78,7 +78,7 @@ func run(config Config) { // starts loadtest workers. If config.Constant is true, then we don't gather loadtest results and let producer/consumer // workers continue running. If config.Constant is false, then we will gather load test results in a file func startLoadtestWorkers(config Config) { - fmt.Printf("Starting loadtest workers") + fmt.Printf("Starting loadtest workers\n") client := NewLoadTestClient(config) client.SetValidators() @@ -104,7 +104,7 @@ func startLoadtestWorkers(config Config) { var startHeight = getLastHeight(config.BlockchainEndpoint) fmt.Printf("Starting loadtest producers\n") // preload all accounts - keys := client.SignerClient.GetAllTestAccountsKeys() + keys := client.SignerClient.GetTestAccountsKeys(int(config.TargetTps)) for i := 0; i < numProducers; i++ { wg.Add(1) go client.BuildTxs(txQueue, i, keys, &wg, done, &producedCount) diff --git a/loadtest/sign.go b/loadtest/sign.go index 038cb36bc8..025b9f3459 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -62,12 +62,15 @@ func NewSignerClient(nodeURI string) *SignerClient { } } -func (sc *SignerClient) GetAllTestAccountsKeys() []cryptotypes.PrivKey { +func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivKey { userHomeDir, _ := os.UserHomeDir() files, _ := os.ReadDir(filepath.Join(userHomeDir, "test_accounts")) var testAccountsKeys []cryptotypes.PrivKey for i, file := range files { testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", file.Name())) + if i >= maxAccounts { + break + } } return testAccountsKeys } From 533ffdef32572628149aa190bfbd41ed934c3979 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 20:02:20 -0800 Subject: [PATCH 25/46] debug --- loadtest/sign.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/loadtest/sign.go b/loadtest/sign.go index 025b9f3459..e9b36607b0 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -66,7 +66,17 @@ func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivK userHomeDir, _ := os.UserHomeDir() files, _ := os.ReadDir(filepath.Join(userHomeDir, "test_accounts")) var testAccountsKeys []cryptotypes.PrivKey + fmt.Printf("Loading accounts\n") for i, file := range files { + if i == len(files)/4 { + fmt.Printf("Loading accounts 1/4 done") + } + if i == len(files)/2 { + fmt.Printf("Loading accounts 1/2 done") + } + if i == 3*len(files)/4 { + fmt.Printf("Loading accounts 3/4 done") + } testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", file.Name())) if i >= maxAccounts { break From eeec82fa0c41c8d1ff64d3c7c04503cbf344350b Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 20:19:41 -0800 Subject: [PATCH 26/46] debug --- loadtest/loadtest_client.go | 1 - loadtest/sign.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 4cd0245c69..3002deb1d9 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -49,7 +49,6 @@ func NewLoadTestClient(config Config) *LoadTestClient { grpc.MaxCallSendMsgSize(20*1024*1024)), ) dialOptions = append(dialOptions, grpc.WithBlock()) - // NOTE: Will likely need to whitelist node from elb rate limits - add ip to producer ip set if config.TLS { dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))) //nolint:gosec // Use insecure skip verify. } else { diff --git a/loadtest/sign.go b/loadtest/sign.go index e9b36607b0..d78ce03d3c 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -77,7 +77,7 @@ func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivK if i == 3*len(files)/4 { fmt.Printf("Loading accounts 3/4 done") } - testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", file.Name())) + testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", filepath.Join(userHomeDir, file.Name()))) if i >= maxAccounts { break } From 950e7ed4968750b53f5684c66312478b89559017 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 20:20:59 -0800 Subject: [PATCH 27/46] debug --- loadtest/sign.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/sign.go b/loadtest/sign.go index d78ce03d3c..1cbebbeaa7 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -77,7 +77,7 @@ func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivK if i == 3*len(files)/4 { fmt.Printf("Loading accounts 3/4 done") } - testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", filepath.Join(userHomeDir, file.Name()))) + testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", filepath.Join(userHomeDir, "test_accounts", file.Name()))) if i >= maxAccounts { break } From 168a15c7b49f005cafce333ed64fb49c54191301 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 20:25:06 -0800 Subject: [PATCH 28/46] debug --- loadtest/loadtest_client.go | 113 +++++++++++++++++++++++------------- loadtest/sign.go | 9 --- 2 files changed, 72 insertions(+), 50 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 3002deb1d9..e36a93143d 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -4,6 +4,8 @@ import ( "context" "fmt" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + "golang.org/x/sync/semaphore" + "golang.org/x/time/rate" "math" "math/rand" "strings" @@ -124,57 +126,86 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys [] } } -func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { - //rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) +//func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { +// rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) +// maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls +// sem := semaphore.NewWeighted(int64(maxConcurrent)) +// +// lastHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint) +// for { +// newHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint) +// for newHeight == lastHeight { +// time.Sleep(10 * time.Millisecond) +// newHeight = getLastHeight(c.LoadTestConfig.BlockchainEndpoint) +// } +// for i := 0; i < maxConcurrent; i++ { +// select { +// case <-done: +// fmt.Printf("Stopping consumers\n") +// wg.Wait() +// return +// case tx, ok := <-txQueue: +// if !ok { +// fmt.Printf("Stopping consumers\n") +// wg.Wait() +// return +// } +// +// if err := sem.Acquire(context.Background(), 1); err != nil { +// fmt.Printf("Failed to acquire semaphore: %s", err) +// break +// } +// +// wg.Add(1) +// go func(tx []byte) { +// defer wg.Done() +// defer sem.Release(1) +// SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) +// }(tx) +// } +// +// } +// lastHeight = newHeight +// } +//} + +func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) { + rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) + wg := sync.WaitGroup{} maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls - //sem := semaphore.NewWeighted(int64(maxConcurrent)) + sem := semaphore.NewWeighted(int64(maxConcurrent)) - //i := 0 - lastHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint) for { - newHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint) - for newHeight == lastHeight { - time.Sleep(10 * time.Millisecond) - newHeight = getLastHeight(c.LoadTestConfig.BlockchainEndpoint) - } - for i := 0; i < maxConcurrent; i++ { - select { - case <-done: + select { + case <-done: + fmt.Printf("Stopping consumers\n") + wg.Wait() + return + case tx, ok := <-txQueue: + if !ok { fmt.Printf("Stopping consumers\n") wg.Wait() return - case tx, ok := <-txQueue: - if !ok { - fmt.Printf("Stopping consumers\n") - wg.Wait() - return - } + } - //if err := sem.Acquire(context.Background(), 1); err != nil { - // fmt.Printf("Failed to acquire semaphore: %s", err) - // break - //} - - wg.Add(1) - go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) - //if i >= maxConcurrent { - // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) - // i = 0 - // - //} else { - //go func(tx []byte) { - // defer wg.Done() - // defer sem.Release(1) - - //if err := rateLimiter.Wait(context.Background()); err == nil { - // SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_SYNC, false, *c, sentCount) - //} - //}(tx) - //} + if err := sem.Acquire(context.Background(), 1); err != nil { + fmt.Printf("Failed to acquire semaphore: %v", err) + break } + wg.Add(1) + go func(tx []byte) { + defer wg.Done() + defer sem.Release(1) + + if err := rateLimiter.Wait(context.Background()); err != nil { + fmt.Printf("Error waiting for rate limiter: %v\n", err) + return + } + + SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + }(tx) } - lastHeight = newHeight } } diff --git a/loadtest/sign.go b/loadtest/sign.go index 1cbebbeaa7..aa2e331de9 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -68,15 +68,6 @@ func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivK var testAccountsKeys []cryptotypes.PrivKey fmt.Printf("Loading accounts\n") for i, file := range files { - if i == len(files)/4 { - fmt.Printf("Loading accounts 1/4 done") - } - if i == len(files)/2 { - fmt.Printf("Loading accounts 1/2 done") - } - if i == 3*len(files)/4 { - fmt.Printf("Loading accounts 3/4 done") - } testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", filepath.Join(userHomeDir, "test_accounts", file.Name()))) if i >= maxAccounts { break From 9ecc1d70808221a95daaf2d39aafc05d1b22d7fc Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 20:26:37 -0800 Subject: [PATCH 29/46] debug --- loadtest/loadtest_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index e36a93143d..9b6ed1fc27 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -169,9 +169,8 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys [] // } //} -func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) { +func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) - wg := sync.WaitGroup{} maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls sem := semaphore.NewWeighted(int64(maxConcurrent)) From bca86e8835cf0bb1f314de1ee15e12acbba483a9 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 11 Jan 2024 20:40:29 -0800 Subject: [PATCH 30/46] debug --- loadtest/loadtest_client.go | 44 ------------------------------------- loadtest/tx.go | 20 ----------------- 2 files changed, 64 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 9b6ed1fc27..11bc77d113 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -126,49 +126,6 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys [] } } -//func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { -// rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) -// maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls -// sem := semaphore.NewWeighted(int64(maxConcurrent)) -// -// lastHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint) -// for { -// newHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint) -// for newHeight == lastHeight { -// time.Sleep(10 * time.Millisecond) -// newHeight = getLastHeight(c.LoadTestConfig.BlockchainEndpoint) -// } -// for i := 0; i < maxConcurrent; i++ { -// select { -// case <-done: -// fmt.Printf("Stopping consumers\n") -// wg.Wait() -// return -// case tx, ok := <-txQueue: -// if !ok { -// fmt.Printf("Stopping consumers\n") -// wg.Wait() -// return -// } -// -// if err := sem.Acquire(context.Background(), 1); err != nil { -// fmt.Printf("Failed to acquire semaphore: %s", err) -// break -// } -// -// wg.Add(1) -// go func(tx []byte) { -// defer wg.Done() -// defer sem.Release(1) -// SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) -// }(tx) -// } -// -// } -// lastHeight = newHeight -// } -//} - func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls @@ -201,7 +158,6 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se fmt.Printf("Error waiting for rate limiter: %v\n", err) return } - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) }(tx) } diff --git a/loadtest/tx.go b/loadtest/tx.go index 1691182a83..a9d0ce6b39 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -36,27 +36,7 @@ func SendTx( } } - //for grpcRes.TxResponse.Code == sdkerrors.ErrMempoolIsFull.ABCICode() { - // // retry after a second until either succeed or fail for some other reason - // fmt.Printf("Mempool full\n") - // time.Sleep(1 * time.Second) - // grpcRes, err = loadtestClient.GetTxClient().BroadcastTx( - // context.Background(), - // &typestx.BroadcastTxRequest{ - // Mode: mode, - // TxBytes: txBytes, - // }, - // ) - // if err != nil { - // if failureExpected { - // } else { - // panic(err) - // } - // } - //} if grpcRes.TxResponse.Code == 0 { atomic.AddInt64(sentCount, 1) - } else { - fmt.Printf("PSUDEBUTG - failed: %s\n", grpcRes.TxResponse) } } From bdcd39ba52bdfa915438e76d81165ffcd30a2a55 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:12:49 -0800 Subject: [PATCH 31/46] debug --- loadtest/tx.go | 52 +++++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/loadtest/tx.go b/loadtest/tx.go index a9d0ce6b39..d11155fa7a 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -1,8 +1,6 @@ package main import ( - "context" - "fmt" typestx "github.com/cosmos/cosmos-sdk/types/tx" "sync/atomic" ) @@ -14,29 +12,31 @@ func SendTx( loadtestClient LoadTestClient, sentCount *int64, ) { - grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( - context.Background(), - &typestx.BroadcastTxRequest{ - Mode: mode, - TxBytes: txBytes, - }, - ) - if err != nil { - if failureExpected { - fmt.Printf("Error: %s\n", err) - } else { - panic(err) - } + atomic.AddInt64(sentCount, 1) - if grpcRes == nil || grpcRes.TxResponse == nil { - return - } - if grpcRes.TxResponse.Code == 0 { - atomic.AddInt64(sentCount, 1) - } - } - - if grpcRes.TxResponse.Code == 0 { - atomic.AddInt64(sentCount, 1) - } + //grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( + // context.Background(), + // &typestx.BroadcastTxRequest{ + // Mode: mode, + // TxBytes: txBytes, + // }, + //) + //if err != nil { + // if failureExpected { + // fmt.Printf("Error: %s\n", err) + // } else { + // panic(err) + // } + // + // if grpcRes == nil || grpcRes.TxResponse == nil { + // return + // } + // if grpcRes.TxResponse.Code == 0 { + // atomic.AddInt64(sentCount, 1) + // } + //} + // + //if grpcRes.TxResponse.Code == 0 { + // atomic.AddInt64(sentCount, 1) + //} } From 16297dd2102cf50efa5f7b10d619644a27d7e2b4 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:27:30 -0800 Subject: [PATCH 32/46] debug --- loadtest/main.go | 7 ++++++- loadtest/sign.go | 20 ++++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/loadtest/main.go b/loadtest/main.go index 8e9f81b55a..ef051bdb7b 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -87,7 +87,7 @@ func startLoadtestWorkers(config Config) { txQueue := make(chan []byte, 10000) done := make(chan struct{}) - numProducers := 5 + numProducers := 1000 var wg sync.WaitGroup // Catch OS signals for graceful shutdown @@ -109,6 +109,10 @@ func startLoadtestWorkers(config Config) { wg.Add(1) go client.BuildTxs(txQueue, i, keys, &wg, done, &producedCount) } + // Give producers some time to populate queue + if config.TargetTps > 1000 { + time.Sleep(5 * time.Second) + } fmt.Printf("Starting loadtest consumers\n") go client.SendTxs(txQueue, done, &sentCount, int(config.TargetTps), &wg) @@ -128,6 +132,7 @@ func startLoadtestWorkers(config Config) { blockHeights = append(blockHeights, i) blockTimes = append(blockTimes, blockTime) } + fmt.Printf("TxQueue len: %d", len(txQueue)) printStats(start, &producedCount, &sentCount, &prevSentCount, blockHeights, blockTimes) startHeight = currHeight diff --git a/loadtest/sign.go b/loadtest/sign.go index aa2e331de9..252d428ff8 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -66,14 +66,30 @@ func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivK userHomeDir, _ := os.UserHomeDir() files, _ := os.ReadDir(filepath.Join(userHomeDir, "test_accounts")) var testAccountsKeys []cryptotypes.PrivKey + var wg sync.WaitGroup + keysChan := make(chan cryptotypes.PrivKey, maxAccounts) fmt.Printf("Loading accounts\n") for i, file := range files { - testAccountsKeys = append(testAccountsKeys, sc.GetKey(fmt.Sprint(i), "test", filepath.Join(userHomeDir, "test_accounts", file.Name()))) if i >= maxAccounts { break } + wg.Add(1) + go func(i int, fileName string) { + defer wg.Done() + key := sc.GetKey(fmt.Sprint(i), "test", filepath.Join(userHomeDir, "test_accounts", fileName)) + keysChan <- key + }(i, file.Name()) } - return testAccountsKeys + wg.Wait() + close(keysChan) + // Collect keys from the channel + idx := 0 + for key := range keysChan { + testAccountsKeys[idx] = key + idx++ + } + + return testAccountsKeys[:idx] } func (sc *SignerClient) GetAdminAccountKeyPath() string { From b33e0946239e41cc3ef2ce1dc38edd5c0fcdb6a1 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:38:38 -0800 Subject: [PATCH 33/46] debug --- loadtest/loadtest_client.go | 8 ++++---- loadtest/main.go | 1 - loadtest/sign.go | 6 ++---- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 11bc77d113..27ff296785 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -127,6 +127,8 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys [] } func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls sem := semaphore.NewWeighted(int64(maxConcurrent)) @@ -135,16 +137,14 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se select { case <-done: fmt.Printf("Stopping consumers\n") - wg.Wait() return case tx, ok := <-txQueue: if !ok { fmt.Printf("Stopping consumers\n") - wg.Wait() return } - if err := sem.Acquire(context.Background(), 1); err != nil { + if err := sem.Acquire(ctx, 1); err != nil { fmt.Printf("Failed to acquire semaphore: %v", err) break } @@ -154,7 +154,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se defer wg.Done() defer sem.Release(1) - if err := rateLimiter.Wait(context.Background()); err != nil { + if err := rateLimiter.Wait(ctx); err != nil { fmt.Printf("Error waiting for rate limiter: %v\n", err) return } diff --git a/loadtest/main.go b/loadtest/main.go index ef051bdb7b..5f5f47555a 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -132,7 +132,6 @@ func startLoadtestWorkers(config Config) { blockHeights = append(blockHeights, i) blockTimes = append(blockTimes, blockTime) } - fmt.Printf("TxQueue len: %d", len(txQueue)) printStats(start, &producedCount, &sentCount, &prevSentCount, blockHeights, blockTimes) startHeight = currHeight diff --git a/loadtest/sign.go b/loadtest/sign.go index 252d428ff8..0fd39879ad 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -83,13 +83,11 @@ func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivK wg.Wait() close(keysChan) // Collect keys from the channel - idx := 0 for key := range keysChan { - testAccountsKeys[idx] = key - idx++ + testAccountsKeys = append(testAccountsKeys, key) } - return testAccountsKeys[:idx] + return testAccountsKeys } func (sc *SignerClient) GetAdminAccountKeyPath() string { From 31e0afa4fb4a0bf70b8fcada8bc26701c7b46c35 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:41:26 -0800 Subject: [PATCH 34/46] debug --- loadtest/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/loadtest/main.go b/loadtest/main.go index 5f5f47555a..6e2168d8b6 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -149,7 +149,10 @@ func startLoadtestWorkers(config Config) { <-signals fmt.Println("SIGINT received, shutting down...") close(done) + fmt.Println("closed done") + wg.Wait() + fmt.Println("wg wait complete") close(txQueue) } From ffbdeca75d01e9e0958ad287f6f98ee2066415c1 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:42:41 -0800 Subject: [PATCH 35/46] debug --- loadtest/loadtest_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 27ff296785..9bfe9a42ff 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -148,9 +148,8 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se fmt.Printf("Failed to acquire semaphore: %v", err) break } - - wg.Add(1) go func(tx []byte) { + wg.Add(1) defer wg.Done() defer sem.Release(1) From 120058f51e789bfd65c69c15aa1caf3d80f6f344 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:46:03 -0800 Subject: [PATCH 36/46] debug --- loadtest/loadtest_client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 9bfe9a42ff..2a7994fcfc 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -148,12 +148,14 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se fmt.Printf("Failed to acquire semaphore: %v", err) break } + wg.Add(1) go func(tx []byte) { - wg.Add(1) + localCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() defer wg.Done() defer sem.Release(1) - if err := rateLimiter.Wait(ctx); err != nil { + if err := rateLimiter.Wait(localCtx); err != nil { fmt.Printf("Error waiting for rate limiter: %v\n", err) return } From 89e95777b21196ee00bf7894ab9c45c17c527162 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:48:49 -0800 Subject: [PATCH 37/46] debug --- loadtest/loadtest_client.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 2a7994fcfc..263bb81f89 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -117,11 +117,16 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys [] types.NewCoin("usei", types.NewInt(fee)), }) // Use random seqno to get around txs that might already be seen in mempool - c.SignerClient.SignTx(c.ChainID, &txBuilder, key, uint64(rand.Intn(math.MaxInt))) txBytes, _ := TestConfig.TxConfig.TxEncoder()(txBuilder.GetTx()) - txQueue <- txBytes - atomic.AddInt64(producedCount, 1) + select { + case txQueue <- txBytes: + atomic.AddInt64(producedCount, 1) + case <-done: + // Exit if done signal is received while trying to send to txQueue + fmt.Printf("Stopping producer %d due to done signal while sending to txQueue\n", producerId) + return + } } } } From b12eda2d3b36dbd4bd4b925f2acebcf7b48f3b46 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:49:44 -0800 Subject: [PATCH 38/46] debug --- loadtest/loadtest_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 263bb81f89..0640b65197 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -124,7 +124,6 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys [] atomic.AddInt64(producedCount, 1) case <-done: // Exit if done signal is received while trying to send to txQueue - fmt.Printf("Stopping producer %d due to done signal while sending to txQueue\n", producerId) return } } @@ -132,7 +131,7 @@ func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys [] } func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls From 90cddab1bd18ff775820b45b7dffbb619a552254 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 06:50:24 -0800 Subject: [PATCH 39/46] debug --- loadtest/loadtest_client.go | 1 - loadtest/main.go | 2 -- loadtest/tx.go | 52 ++++++++++++++++++------------------- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 0640b65197..1ec0baf3f9 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -160,7 +160,6 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se defer sem.Release(1) if err := rateLimiter.Wait(localCtx); err != nil { - fmt.Printf("Error waiting for rate limiter: %v\n", err) return } SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) diff --git a/loadtest/main.go b/loadtest/main.go index 6e2168d8b6..99ab610b26 100644 --- a/loadtest/main.go +++ b/loadtest/main.go @@ -149,10 +149,8 @@ func startLoadtestWorkers(config Config) { <-signals fmt.Println("SIGINT received, shutting down...") close(done) - fmt.Println("closed done") wg.Wait() - fmt.Println("wg wait complete") close(txQueue) } diff --git a/loadtest/tx.go b/loadtest/tx.go index d11155fa7a..7dbf20c3ce 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -1,6 +1,7 @@ package main import ( + "fmt" typestx "github.com/cosmos/cosmos-sdk/types/tx" "sync/atomic" ) @@ -12,31 +13,30 @@ func SendTx( loadtestClient LoadTestClient, sentCount *int64, ) { - atomic.AddInt64(sentCount, 1) - //grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( - // context.Background(), - // &typestx.BroadcastTxRequest{ - // Mode: mode, - // TxBytes: txBytes, - // }, - //) - //if err != nil { - // if failureExpected { - // fmt.Printf("Error: %s\n", err) - // } else { - // panic(err) - // } - // - // if grpcRes == nil || grpcRes.TxResponse == nil { - // return - // } - // if grpcRes.TxResponse.Code == 0 { - // atomic.AddInt64(sentCount, 1) - // } - //} - // - //if grpcRes.TxResponse.Code == 0 { - // atomic.AddInt64(sentCount, 1) - //} + grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( + context.Background(), + &typestx.BroadcastTxRequest{ + Mode: mode, + TxBytes: txBytes, + }, + ) + if err != nil { + if failureExpected { + fmt.Printf("Error: %s\n", err) + } else { + panic(err) + } + + if grpcRes == nil || grpcRes.TxResponse == nil { + return + } + if grpcRes.TxResponse.Code == 0 { + atomic.AddInt64(sentCount, 1) + } + } + + if grpcRes.TxResponse.Code == 0 { + atomic.AddInt64(sentCount, 1) + } } From 0aa29a3491e494c442cc82a71ea5fdbbd8650643 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 07:14:27 -0800 Subject: [PATCH 40/46] debug --- app/app.go | 3 --- loadtest/tx.go | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/app/app.go b/app/app.go index ef9e80ac15..75c58a24ce 100644 --- a/app/app.go +++ b/app/app.go @@ -1334,7 +1334,6 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ }, } beginBlockResp := app.BeginBlock(ctx, beginBlockReq) - events = append(events, beginBlockResp.Events...) txResults := make([]*abci.ExecTxResult, len(txs)) @@ -1348,14 +1347,12 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ // Finalize all Bank Module Transfers here so that events are included for prioritiezd txs deferredWriteEvents := app.BankKeeper.WriteDeferredBalances(ctx) - events = append(events, deferredWriteEvents...) midBlockEvents := app.MidBlock(ctx, req.GetHeight()) events = append(events, midBlockEvents...) otherResults, ctx := app.BuildDependenciesAndRunTxs(ctx, otherTxs) - for relativeOtherIndex, originalIndex := range otherIndices { txResults[originalIndex] = otherResults[relativeOtherIndex] } diff --git a/loadtest/tx.go b/loadtest/tx.go index 7dbf20c3ce..e4fa815eea 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" typestx "github.com/cosmos/cosmos-sdk/types/tx" "sync/atomic" From bcd47d1210616b78ab5eac8cc9ecfb2c9e72e055 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 10:20:16 -0800 Subject: [PATCH 41/46] debug --- loadtest/loadtest_client.go | 2 +- loadtest/tx.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 1ec0baf3f9..12fbe208c5 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -162,7 +162,7 @@ func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, se if err := rateLimiter.Wait(localCtx); err != nil { return } - SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) + SendTx(ctx, tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount) }(tx) } } diff --git a/loadtest/tx.go b/loadtest/tx.go index e4fa815eea..3aee469061 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -8,6 +8,7 @@ import ( ) func SendTx( + ctx context.Context, txBytes []byte, mode typestx.BroadcastMode, failureExpected bool, @@ -16,7 +17,7 @@ func SendTx( ) { grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( - context.Background(), + ctx, &typestx.BroadcastTxRequest{ Mode: mode, TxBytes: txBytes, From b8bed1345d5eda8e0786573666d0e8245048e7f8 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 10:24:14 -0800 Subject: [PATCH 42/46] debug --- loadtest/tx.go | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/loadtest/tx.go b/loadtest/tx.go index 3aee469061..83d6a83430 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" typestx "github.com/cosmos/cosmos-sdk/types/tx" "sync/atomic" ) @@ -16,27 +15,13 @@ func SendTx( sentCount *int64, ) { - grpcRes, err := loadtestClient.GetTxClient().BroadcastTx( + grpcRes, _ := loadtestClient.GetTxClient().BroadcastTx( ctx, &typestx.BroadcastTxRequest{ Mode: mode, TxBytes: txBytes, }, ) - if err != nil { - if failureExpected { - fmt.Printf("Error: %s\n", err) - } else { - panic(err) - } - - if grpcRes == nil || grpcRes.TxResponse == nil { - return - } - if grpcRes.TxResponse.Code == 0 { - atomic.AddInt64(sentCount, 1) - } - } if grpcRes.TxResponse.Code == 0 { atomic.AddInt64(sentCount, 1) From 77716293c80d0300e945d8b42e858b016aea28bd Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 10:29:10 -0800 Subject: [PATCH 43/46] debug --- loadtest/tx.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/loadtest/tx.go b/loadtest/tx.go index 83d6a83430..5a3dabd8cb 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -23,7 +23,11 @@ func SendTx( }, ) - if grpcRes.TxResponse.Code == 0 { + if failureExpected { atomic.AddInt64(sentCount, 1) + return + } else if grpcRes != nil && grpcRes.TxResponse.Code == 0 { + atomic.AddInt64(sentCount, 1) + return } } From 44bb368b07e6f852c4c8adae48d0fb86ef0d4d4d Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 12 Jan 2024 11:14:16 -0800 Subject: [PATCH 44/46] debug --- loadtest/loadtest_client.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 12fbe208c5..87a511b2a9 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -6,6 +6,7 @@ import ( cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" "golang.org/x/sync/semaphore" "golang.org/x/time/rate" + "google.golang.org/grpc/connectivity" "math" "math/rand" "strings" @@ -65,6 +66,22 @@ func NewLoadTestClient(config Config) *LoadTestClient { dialOptions...) TxClients[i] = typestx.NewServiceClient(grpcConn) GrpcConns[i] = grpcConn + // spin up goroutine for monitoring and reconnect purposes + go func() { + for { + state := grpcConn.GetState() + if state == connectivity.TransientFailure || state == connectivity.Shutdown { + fmt.Println("GRPC Connection lost, attempting to reconnect...") + for { + if grpcConn.WaitForStateChange(context.Background(), state) { + break + } + time.Sleep(10 * time.Second) + } + } + time.Sleep(10 * time.Second) + } + }() } return &LoadTestClient{ From 3b228f0f4761a4a515eb09c8a86c78e9961013c2 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 15 Jan 2024 18:19:45 -0800 Subject: [PATCH 45/46] lint --- loadtest/loadtest_client.go | 9 +++++---- loadtest/sign.go | 4 +++- loadtest/tx.go | 3 ++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/loadtest/loadtest_client.go b/loadtest/loadtest_client.go index 87a511b2a9..22b9be0d37 100644 --- a/loadtest/loadtest_client.go +++ b/loadtest/loadtest_client.go @@ -3,10 +3,6 @@ package main import ( "context" "fmt" - cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" - "golang.org/x/sync/semaphore" - "golang.org/x/time/rate" - "google.golang.org/grpc/connectivity" "math" "math/rand" "strings" @@ -14,6 +10,11 @@ import ( "sync/atomic" "time" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + "golang.org/x/sync/semaphore" + "golang.org/x/time/rate" + "google.golang.org/grpc/connectivity" + "github.com/cosmos/cosmos-sdk/types" typestx "github.com/cosmos/cosmos-sdk/types/tx" stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" diff --git a/loadtest/sign.go b/loadtest/sign.go index 0fd39879ad..31025b7085 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io" + "math" "os" "path/filepath" "sync" @@ -65,7 +66,8 @@ func NewSignerClient(nodeURI string) *SignerClient { func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivKey { userHomeDir, _ := os.UserHomeDir() files, _ := os.ReadDir(filepath.Join(userHomeDir, "test_accounts")) - var testAccountsKeys []cryptotypes.PrivKey + + var testAccountsKeys = make([]cryptotypes.PrivKey, math.Min(float64(len(files)), float64(maxAccounts))) var wg sync.WaitGroup keysChan := make(chan cryptotypes.PrivKey, maxAccounts) fmt.Printf("Loading accounts\n") diff --git a/loadtest/tx.go b/loadtest/tx.go index 5a3dabd8cb..76a90390f4 100644 --- a/loadtest/tx.go +++ b/loadtest/tx.go @@ -2,8 +2,9 @@ package main import ( "context" - typestx "github.com/cosmos/cosmos-sdk/types/tx" "sync/atomic" + + typestx "github.com/cosmos/cosmos-sdk/types/tx" ) func SendTx( From 0ea591c5b651718c5f6f2cb8b8e43ff99f3d3acc Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 15 Jan 2024 18:42:43 -0800 Subject: [PATCH 46/46] int --- loadtest/sign.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/sign.go b/loadtest/sign.go index 31025b7085..ca839c1ed0 100644 --- a/loadtest/sign.go +++ b/loadtest/sign.go @@ -67,7 +67,7 @@ func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivK userHomeDir, _ := os.UserHomeDir() files, _ := os.ReadDir(filepath.Join(userHomeDir, "test_accounts")) - var testAccountsKeys = make([]cryptotypes.PrivKey, math.Min(float64(len(files)), float64(maxAccounts))) + var testAccountsKeys = make([]cryptotypes.PrivKey, int(math.Min(float64(len(files)), float64(maxAccounts)))) var wg sync.WaitGroup keysChan := make(chan cryptotypes.PrivKey, maxAccounts) fmt.Printf("Loading accounts\n")