Skip to content
46 changes: 42 additions & 4 deletions evmrpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,8 @@
}

func (a *FilterAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) (res []*ethtypes.Log, err error) {
defer recordMetricsWithError(fmt.Sprintf("%s_getLogs", a.namespace), a.connectionType, time.Now(), err)
startTime := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
defer recordMetricsWithError(fmt.Sprintf("%s_getLogs", a.namespace), a.connectionType, startTime, err)

latest, err := a.logFetcher.latestHeight(ctx)
if err != nil {
Expand All @@ -549,16 +550,43 @@

blockRange := end - begin + 1

// Record metrics for eth_getLogs
defer func() {
GetGlobalMetrics().RecordGetLogsRequest(blockRange, time.Since(startTime), startTime, err)
}()

// Use config value instead of hardcoded constant
if blockRange > a.filterConfig.maxBlock {
return nil, fmt.Errorf("block range too large (%d), maximum allowed is %d blocks", blockRange, a.filterConfig.maxBlock)
}

// Early rejection for pruned blocks - avoid wasting resources on blocks that don't exist
if earliest > 0 && begin < earliest {
return nil, fmt.Errorf("requested block range [%d, %d] includes pruned blocks, earliest available block is %d", begin, end, earliest)
}

// Only apply rate limiting for large queries (> RPSLimitThreshold blocks)
if blockRange > RPSLimitThreshold && !a.globalRPSLimiter.Allow() {
return nil, fmt.Errorf("log query rate limit exceeded for large queries, please try again later")
}

// Backpressure: early rejection based on system load
m := GetGlobalMetrics()

// Check 1: Too many pending tasks (queue backlog)
pending := m.TasksSubmitted.Load() - m.TasksCompleted.Load()
maxPending := int64(float64(m.QueueCapacity.Load()) * 0.8) // 80% threshold

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
if pending > maxPending {
return nil, fmt.Errorf("server too busy, rejecting new request (pending: %d, threshold: %d)", pending, maxPending)
}

// Check 2: I/O saturated (semaphore exhausted)
semInUse := m.DBSemaphoreAcquired.Load()
semCapacity := m.DBSemaphoreCapacity.Load()
if semCapacity > 0 && float64(semInUse)/float64(semCapacity) >= 0.8 {

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
return nil, fmt.Errorf("server I/O saturated, rejecting new request (semaphore: %d/%d in use)", semInUse, semCapacity)
}

logs, _, err := a.logFetcher.GetLogsByFilters(ctx, crit, 0)
if err != nil {
return nil, err
Expand Down Expand Up @@ -781,7 +809,7 @@
batch := blockBatch
wg.Add(1)

if err := runner.Submit(func() { processBatch(batch) }); err != nil {
if err := runner.SubmitWithMetrics(func() { processBatch(batch) }); err != nil {
wg.Done()
submitError = fmt.Errorf("system overloaded, please reduce request frequency: %w", err)
break
Expand All @@ -797,7 +825,7 @@
// Process remaining blocks
if len(blockBatch) > 0 {
wg.Add(1)
if err := runner.Submit(func() { processBatch(blockBatch) }); err != nil {
if err := runner.SubmitWithMetrics(func() { processBatch(blockBatch) }); err != nil {
wg.Done()
return nil, 0, fmt.Errorf("system overloaded, please reduce request frequency: %w", err)
}
Expand Down Expand Up @@ -1011,7 +1039,7 @@
}

wg.Add(1)
if err := runner.Submit(func(start, endHeight int64) func() {
if err := runner.SubmitWithMetrics(func(start, endHeight int64) func() {
return func() {
defer wg.Done()
f.processBatch(ctx, start, endHeight, crit, bloomIndexes, res, errChan)
Expand Down Expand Up @@ -1048,6 +1076,9 @@
defer func() {
metrics.IncrementRpcRequestCounter("num_blocks_fetched", "blocks", true)
}()

wpMetrics := GetGlobalMetrics()

for height := start; height <= end; height++ {
if height == 0 {
continue
Expand All @@ -1065,11 +1096,15 @@
}

// Block cache miss, acquire semaphore for I/O operations
semWaitStart := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
f.dbReadSemaphore <- struct{}{}
wpMetrics.RecordDBSemaphoreWait(time.Since(semWaitStart))
wpMetrics.RecordDBSemaphoreAcquire()

// Re-check cache after acquiring semaphore, in case another worker cached it.
if cachedEntry, found := f.globalBlockCache.Get(height); found {
<-f.dbReadSemaphore
wpMetrics.RecordDBSemaphoreRelease()
if cachedEntry.Block != nil {
if err := f.watermarks.EnsureBlockHeightAvailable(ctx, cachedEntry.Block.Block.Height); err != nil {
continue
Expand All @@ -1094,6 +1129,7 @@
// skip the bloom pre-filter instead of short-circuiting the block.
if blockBloom != (ethtypes.Bloom{}) && !MatchFilters(blockBloom, bloomIndexes) {
<-f.dbReadSemaphore
wpMetrics.RecordDBSemaphoreRelease()
continue // skip the block if bloom filter does not match
}
}
Expand All @@ -1106,6 +1142,7 @@
default:
}
<-f.dbReadSemaphore
wpMetrics.RecordDBSemaphoreRelease()
continue
}

Expand All @@ -1116,6 +1153,7 @@
fillMissingFields(entry, block, blockBloom)
}
<-f.dbReadSemaphore
wpMetrics.RecordDBSemaphoreRelease()
res <- block
}
}
Expand Down
10 changes: 9 additions & 1 deletion evmrpc/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ type HTTPServer struct {
}

const (
shutdownTimeout = 5 * time.Second
shutdownTimeout = 5 * time.Second
metricsPrinterInterval = 5 * time.Second
)

func NewHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *HTTPServer {
Expand Down Expand Up @@ -184,6 +185,10 @@ func (h *HTTPServer) Start() error {
"vhosts", strings.Join(h.HTTPConfig.Vhosts, ","),
)

// Start metrics printer
// Prometheus metrics are always exported; stdout printing requires EVM_DEBUG_METRICS=true
StartMetricsPrinter(metricsPrinterInterval)

// Log all handlers mounted on server.
paths := make([]string, len(h.handlerNames))
for path := range h.handlerNames {
Expand Down Expand Up @@ -256,6 +261,9 @@ func (h *HTTPServer) doStop() {
return // not running
}

// Stop metrics printer
StopMetricsPrinter()

// Shut down the server.
httpHandler := h.httpHandler.Load().(*rpcHandler)
wsHandler := h.wsHandler.Load().(*rpcHandler)
Expand Down
31 changes: 25 additions & 6 deletions evmrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"strings"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/sei-protocol/sei-chain/app/legacyabci"
evmrpcconfig "github.com/sei-protocol/sei-chain/evmrpc/config"
"github.com/sei-protocol/sei-chain/evmrpc/stats"
Expand Down Expand Up @@ -49,9 +50,24 @@ func NewEVMHTTPServer(
) (EVMServer, error) {
logger = logger.With("module", "evmrpc")

// Initialize global worker pool with configuration
// Initialize global worker pool with configuration (metrics are embedded in pool)
InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize)

// Get pool for logging and DB semaphore setup
pool := GetGlobalWorkerPool()
workerCount := pool.WorkerCount()
queueSize := pool.QueueSize()

// Set DB semaphore capacity in metrics (aligned with worker count)
// Only set once to avoid races when multiple test servers start in parallel.
pool.Metrics.DBSemaphoreCapacity.CompareAndSwap(0, int32(workerCount)) //nolint:gosec // G115: safe, max is 64

debugEnabled := IsDebugMetricsEnabled()
logger.Info("Started EVM RPC metrics exporter (interval: 5s)", "workers", workerCount, "queue", queueSize, "db_semaphore", workerCount, "debug_stdout", debugEnabled)
if !debugEnabled {
logger.Info("To enable debug metrics output to stdout, set EVM_DEBUG_METRICS=true")
}

// Initialize RPC tracker
stats.InitRPCTracker(ctxProvider(LatestCtxHeight).Context(), logger, config.RPCStatsInterval)

Expand Down Expand Up @@ -86,7 +102,8 @@ func NewEVMHTTPServer(
seiTxAPI := NewSeiTransactionAPI(tmClient, k, ctxProvider, txConfigProvider, homeDir, ConnectionTypeHTTP, isPanicOrSyntheticTxFunc, watermarks, globalBlockCache, cacheCreationMutex)
seiDebugAPI := NewSeiDebugAPI(tmClient, k, beginBlockKeepers, ctxProvider, txConfigProvider, simulateConfig, app, antehandler, ConnectionTypeHTTP, config, globalBlockCache, cacheCreationMutex, watermarks)

dbReadSemaphore := make(chan struct{}, MaxDBReadConcurrency)
// DB semaphore aligned with worker count
dbReadSemaphore := make(chan struct{}, workerCount)
globalLogSlicePool := NewLogSlicePool()
apis := []rpc.API{
{
Expand Down Expand Up @@ -225,7 +242,8 @@ func NewEVMWebSocketServer(
) (EVMServer, error) {
logger = logger.With("module", "evmrpc")

// Initialize global worker pool with configuration
// Initialize global worker pool with configuration (metrics are embedded in pool)
// This is idempotent - if HTTP server already initialized it, this is a no-op
InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize)

// Initialize WebSocket tracker.
Expand All @@ -246,7 +264,8 @@ func NewEVMWebSocketServer(
MaxConcurrentSimulationCalls: config.MaxConcurrentSimulationCalls,
}
watermarks := NewWatermarkManager(tmClient, ctxProvider, stateStore, k.ReceiptStore())
dbReadSemaphore := make(chan struct{}, MaxDBReadConcurrency)
// DB semaphore aligned with worker count
dbReadSemaphore := make(chan struct{}, GetGlobalWorkerPool().WorkerCount())
globalBlockCache := NewBlockCache(3000)
cacheCreationMutex := &sync.Mutex{}
globalLogSlicePool := NewLogSlicePool()
Expand Down
8 changes: 8 additions & 0 deletions evmrpc/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,17 @@ func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriter

rpcSub := notifier.CreateSubscription()

// Track subscription metrics
wpMetrics := GetGlobalMetrics()
wpMetrics.RecordSubscriptionStart()

if filter.BlockHash != nil {
go func() {
defer recoverAndLog()
defer wpMetrics.RecordSubscriptionEnd()
logs, _, err := a.logFetcher.GetLogsByFilters(ctx, *filter, 0)
if err != nil {
wpMetrics.RecordSubscriptionError()
_ = notifier.Notify(rpcSub.ID, err)
return
}
Expand All @@ -187,10 +193,12 @@ func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriter

go func() {
defer recoverAndLog()
defer wpMetrics.RecordSubscriptionEnd()
begin := int64(0)
for {
logs, lastToHeight, err := a.logFetcher.GetLogsByFilters(ctx, *filter, begin)
if err != nil {
wpMetrics.RecordSubscriptionError()
_ = notifier.Notify(rpcSub.ID, err)
return
}
Expand Down
64 changes: 61 additions & 3 deletions evmrpc/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"fmt"
"runtime"
"sync"
"time"

evmrpcconfig "github.com/sei-protocol/sei-chain/evmrpc/config"
)
Expand All @@ -17,6 +18,9 @@
wg sync.WaitGroup
closed bool
mu sync.RWMutex

// Embedded metrics for backpressure and observability
Metrics *WorkerPoolMetrics
}

var (
Expand Down Expand Up @@ -60,11 +64,17 @@
queueSize = evmrpcconfig.DefaultWorkerQueueSize
}

return &WorkerPool{
wp := &WorkerPool{
workers: workers,
taskQueue: make(chan func(), queueSize),
done: make(chan struct{}),
Metrics: &WorkerPoolMetrics{
windowStart: time.Now(),

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
},
}
wp.Metrics.TotalWorkers.Store(int32(workers)) //nolint:gosec // G115: safe, max is 64
wp.Metrics.QueueCapacity.Store(int32(queueSize)) //nolint:gosec // G115: safe, max is 1000
return wp
}

// Start initializes and starts the worker goroutines
Expand All @@ -85,22 +95,56 @@
}
}()
// The worker will exit gracefully when the taskQueue is closed and drained.
for task := range wp.taskQueue {
for wrappedTask := range wp.taskQueue {
func() {
defer func() {
if r := recover(); r != nil {
// Log the panic but continue processing other tasks
fmt.Printf("Task recovered from panic: %v\n", r)
wp.Metrics.RecordTaskPanicked()
}
}()
task()
wrappedTask()
}()
}
}()
}
})
}

// SubmitWithMetrics submits a task with full metrics tracking
func (wp *WorkerPool) SubmitWithMetrics(task func()) error {
// Check if pool is closed first
wp.mu.RLock()
if wp.closed {
wp.mu.RUnlock()
return fmt.Errorf("worker pool is closing")
}
wp.mu.RUnlock()

queuedAt := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism

// Wrap the task with metrics
wrappedTask := func() {
startedAt := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
wp.Metrics.RecordTaskStarted(queuedAt)
defer wp.Metrics.RecordTaskCompleted(startedAt)
task()
}

select {
case wp.taskQueue <- wrappedTask:
wp.Metrics.RecordTaskSubmitted()
return nil
case <-wp.done:
return fmt.Errorf("worker pool is closing")
default:
// Queue is full - fail fast
wp.Metrics.RecordTaskRejected()
return fmt.Errorf("worker pool queue is full")
}
}

// Submit submits a task to the worker pool with fail-fast behavior
// Returns error if queue is full or pool is closing
func (wp *WorkerPool) Submit(task func()) error {
Expand Down Expand Up @@ -147,3 +191,17 @@
func (wp *WorkerPool) QueueSize() int {
return cap(wp.taskQueue)
}

// QueueDepth returns the current number of tasks in the queue
func (wp *WorkerPool) QueueDepth() int {
return len(wp.taskQueue)
}

// QueueUtilization returns the percentage of queue capacity in use
func (wp *WorkerPool) QueueUtilization() float64 {
cap := cap(wp.taskQueue)
if cap == 0 {
return 0
}
return float64(len(wp.taskQueue)) / float64(cap) * 100

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
}
Loading
Loading