Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions evmrpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
const TxSearchPerPage = 10

const (
// DB Concurrency Read Limit
// MaxDBReadConcurrency is the default DB read concurrency limit.
// In practice, this is aligned with WorkerPoolSize at runtime.
// This constant is only used as a fallback for metrics initialization.
MaxDBReadConcurrency = 16

// Default request limits (used as fallback values)
Expand Down Expand Up @@ -530,7 +532,9 @@
}

func (a *FilterAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) (res []*ethtypes.Log, err error) {
defer recordMetrics(fmt.Sprintf("%s_getLogs", a.namespace), a.connectionType, time.Now())
startTime := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
defer recordMetrics(fmt.Sprintf("%s_getLogs", a.namespace), a.connectionType, startTime)

// Calculate block range
latest := a.logFetcher.ctxProvider(LatestCtxHeight).BlockHeight()
begin, end := latest, latest
Expand All @@ -546,16 +550,44 @@

blockRange := end - begin + 1

// Record metrics for eth_getLogs
defer func() {
GetGlobalMetrics().RecordGetLogsRequest(blockRange, time.Since(startTime), startTime, err)
}()

// Use config value instead of hardcoded constant
if blockRange > a.filterConfig.maxBlock {
return nil, fmt.Errorf("block range too large (%d), maximum allowed is %d blocks", blockRange, a.filterConfig.maxBlock)
}

// Early rejection for pruned blocks - avoid wasting resources on blocks that don't exist
earliest := a.logFetcher.earliestVersion()
if earliest > 0 && begin < earliest {
return nil, fmt.Errorf("requested block range [%d, %d] includes pruned blocks, earliest available block is %d", begin, end, earliest)
}

// Only apply rate limiting for large queries (> RPSLimitThreshold blocks)
if blockRange > RPSLimitThreshold && !a.globalRPSLimiter.Allow() {
return nil, fmt.Errorf("log query rate limit exceeded for large queries, please try again later")
}

// Backpressure: early rejection based on system load
m := GetGlobalMetrics()

// Check 1: Too many pending tasks (queue backlog)
pending := m.TasksSubmitted.Load() - m.TasksCompleted.Load()
maxPending := int64(float64(m.QueueCapacity) * 0.8) // 80% threshold

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
if pending > maxPending {
return nil, fmt.Errorf("server too busy, rejecting new request (pending: %d, threshold: %d)", pending, maxPending)
}

// Check 2: I/O saturated (semaphore exhausted)
semInUse := m.DBSemaphoreAcquired.Load()
semCapacity := m.DBSemaphoreCapacity

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
if semCapacity > 0 && float64(semInUse)/float64(semCapacity) >= 0.8 {
return nil, fmt.Errorf("server I/O saturated, rejecting new request (semaphore: %d/%d in use)", semInUse, semCapacity)
}

logs, _, err := a.logFetcher.GetLogsByFilters(ctx, crit, 0)
if err != nil {
return nil, err
Expand Down Expand Up @@ -741,7 +773,7 @@
batch := blockBatch
wg.Add(1)

if err := runner.Submit(func() { processBatch(batch) }); err != nil {
if err := runner.SubmitWithMetrics(func() { processBatch(batch) }); err != nil {
wg.Done()
submitError = fmt.Errorf("system overloaded, please reduce request frequency: %w", err)
break
Expand All @@ -757,7 +789,7 @@
// Process remaining blocks
if len(blockBatch) > 0 {
wg.Add(1)
if err := runner.Submit(func() { processBatch(blockBatch) }); err != nil {
if err := runner.SubmitWithMetrics(func() { processBatch(blockBatch) }); err != nil {
wg.Done()
return nil, 0, fmt.Errorf("system overloaded, please reduce request frequency: %w", err)
}
Expand Down Expand Up @@ -977,7 +1009,7 @@
}

wg.Add(1)
if err := runner.Submit(func(start, endHeight int64) func() {
if err := runner.SubmitWithMetrics(func(start, endHeight int64) func() {
return func() {
defer wg.Done()
f.processBatch(ctx, start, endHeight, crit, bloomIndexes, res, errChan)
Expand Down Expand Up @@ -1014,6 +1046,9 @@
defer func() {
metrics.IncrementRpcRequestCounter("num_blocks_fetched", "blocks", true)
}()

wpMetrics := GetGlobalMetrics()

for height := start; height <= end; height++ {
if height == 0 {
continue
Expand All @@ -1026,11 +1061,15 @@
}

// Block cache miss, acquire semaphore for I/O operations
semWaitStart := time.Now()
f.dbReadSemaphore <- struct{}{}
wpMetrics.RecordDBSemaphoreWait(time.Since(semWaitStart))
wpMetrics.RecordDBSemaphoreAcquire()

// Re-check cache after acquiring semaphore, in case another worker cached it.
if cachedEntry, found := f.globalBlockCache.Get(height); found {
<-f.dbReadSemaphore
wpMetrics.RecordDBSemaphoreRelease()
res <- cachedEntry.Block
continue
}
Expand All @@ -1046,6 +1085,7 @@
// skip the bloom pre-filter instead of short-circuiting the block.
if blockBloom != (ethtypes.Bloom{}) && !MatchFilters(blockBloom, bloomIndexes) {
<-f.dbReadSemaphore
wpMetrics.RecordDBSemaphoreRelease()
continue // skip the block if bloom filter does not match
}
}
Expand All @@ -1056,8 +1096,9 @@
select {
case errChan <- fmt.Errorf("failed to fetch block at height %d: %w", height, err):
default:
}

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
<-f.dbReadSemaphore
wpMetrics.RecordDBSemaphoreRelease()
continue
}

Expand All @@ -1068,6 +1109,7 @@
fillMissingFields(entry, block, blockBloom)
}
<-f.dbReadSemaphore
wpMetrics.RecordDBSemaphoreRelease()
res <- block
}
}
Expand Down
48 changes: 42 additions & 6 deletions evmrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import (
"context"
"runtime"

Check notice

Code scanning / CodeQL

Sensitive package import Note

Certain system packages contain functions which may be a possible source of non-determinism
"strings"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
Expand Down Expand Up @@ -47,8 +49,27 @@
// Initialize global worker pool with configuration
InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize)

// Initialize RPC tracker (stats package not available in release branch)
// stats.InitRPCTracker(ctxProvider(LatestCtxHeight).Context(), logger, config.RPCStatsInterval)
// Initialize global metrics with worker pool and DB semaphore configuration
workerCount := config.WorkerPoolSize
if workerCount <= 0 {
workerCount = min(MaxWorkerPoolSize, runtime.NumCPU()*2)
}
queueSize := config.WorkerQueueSize
if queueSize <= 0 {
queueSize = DefaultWorkerQueueSize
}
// Align DB semaphore with worker count - each worker gets one I/O slot
dbSemaphoreSize := workerCount
InitGlobalMetrics(workerCount, queueSize, dbSemaphoreSize)

// Start metrics printer (every 5 seconds)
// Prometheus metrics are always exported; stdout printing requires EVM_DEBUG_METRICS=true
StartMetricsPrinter(5 * time.Second)
debugEnabled := IsDebugMetricsEnabled()
logger.Info("Started EVM RPC metrics exporter (interval: 5s)", "workers", workerCount, "queue", queueSize, "db_semaphore", dbSemaphoreSize, "debug_stdout", debugEnabled)
if !debugEnabled {
logger.Info("To enable debug metrics output to stdout, set EVM_DEBUG_METRICS=true")
}

httpServer := NewHTTPServer(logger, rpc.HTTPTimeouts{
ReadTimeout: config.ReadTimeout,
Expand Down Expand Up @@ -80,7 +101,8 @@
seiTxAPI := NewSeiTransactionAPI(tmClient, k, ctxProvider, txConfigProvider, earliestVersion, homeDir, ConnectionTypeHTTP, isPanicOrSyntheticTxFunc, globalBlockCache, cacheCreationMutex)
seiDebugAPI := NewSeiDebugAPI(tmClient, k, ctxProvider, txConfigProvider, earliestVersion, simulateConfig, app, antehandler, ConnectionTypeHTTP, config, globalBlockCache, cacheCreationMutex)

dbReadSemaphore := make(chan struct{}, MaxDBReadConcurrency)
// DB semaphore aligned with worker count
dbReadSemaphore := make(chan struct{}, dbSemaphoreSize)
globalLogSlicePool := NewLogSlicePool()
apis := []rpc.API{
{
Expand Down Expand Up @@ -194,8 +216,21 @@
// Initialize global worker pool with configuration
InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize)

// Initialize WebSocket tracker (stats package not available in release branch)
// stats.InitWSTracker(ctxProvider(LatestCtxHeight).Context(), logger, config.RPCStatsInterval)
// Initialize global metrics (idempotent - only first call takes effect)
workerCountWS := config.WorkerPoolSize
if workerCountWS <= 0 {
workerCountWS = min(MaxWorkerPoolSize, runtime.NumCPU()*2)
}
queueSizeWS := config.WorkerQueueSize
if queueSizeWS <= 0 {
queueSizeWS = DefaultWorkerQueueSize
}
// Align DB semaphore with worker count
dbSemaphoreSizeWS := workerCountWS
InitGlobalMetrics(workerCountWS, queueSizeWS, dbSemaphoreSizeWS)

// Start metrics printer (idempotent - only first call starts printer)
StartMetricsPrinter(5 * time.Second)

httpServer := NewHTTPServer(logger, rpc.HTTPTimeouts{
ReadTimeout: config.ReadTimeout,
Expand All @@ -211,7 +246,8 @@
EVMTimeout: config.SimulationEVMTimeout,
MaxConcurrentSimulationCalls: config.MaxConcurrentSimulationCalls,
}
dbReadSemaphore := make(chan struct{}, MaxDBReadConcurrency)
// DB semaphore aligned with worker count
dbReadSemaphore := make(chan struct{}, dbSemaphoreSizeWS)
globalBlockCache := NewBlockCache(3000)
cacheCreationMutex := &sync.Mutex{}
globalLogSlicePool := NewLogSlicePool()
Expand Down
8 changes: 8 additions & 0 deletions evmrpc/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,17 @@ func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriter

rpcSub := notifier.CreateSubscription()

// Track subscription metrics
wpMetrics := GetGlobalMetrics()
wpMetrics.RecordSubscriptionStart()

if filter.BlockHash != nil {
go func() {
defer recoverAndLog()
defer wpMetrics.RecordSubscriptionEnd()
logs, _, err := a.logFetcher.GetLogsByFilters(ctx, *filter, 0)
if err != nil {
wpMetrics.RecordSubscriptionError()
_ = notifier.Notify(rpcSub.ID, err)
return
}
Expand All @@ -187,10 +193,12 @@ func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriter

go func() {
defer recoverAndLog()
defer wpMetrics.RecordSubscriptionEnd()
begin := int64(0)
for {
logs, lastToHeight, err := a.logFetcher.GetLogsByFilters(ctx, *filter, begin)
if err != nil {
wpMetrics.RecordSubscriptionError()
_ = notifier.Notify(rpcSub.ID, err)
return
}
Expand Down
55 changes: 53 additions & 2 deletions evmrpc/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"fmt"
"runtime"
"sync"
"time"
)

const (
Expand Down Expand Up @@ -106,22 +107,58 @@
}
}()
// The worker will exit gracefully when the taskQueue is closed and drained.
for task := range wp.taskQueue {
for wrappedTask := range wp.taskQueue {
func() {
defer func() {
if r := recover(); r != nil {
// Log the panic but continue processing other tasks
fmt.Printf("Task recovered from panic: %v\n", r)
GetGlobalMetrics().RecordTaskPanicked()
}
}()
task()
wrappedTask()
}()
}
}()
}
})
}

// SubmitWithMetrics submits a task with full metrics tracking
func (wp *WorkerPool) SubmitWithMetrics(task func()) error {
metrics := GetGlobalMetrics()

// Check if pool is closed first
wp.mu.RLock()
if wp.closed {
wp.mu.RUnlock()
return fmt.Errorf("worker pool is closing")
}
wp.mu.RUnlock()

queuedAt := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism

// Wrap the task with metrics
wrappedTask := func() {
startedAt := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
metrics.RecordTaskStarted(queuedAt)
defer metrics.RecordTaskCompleted(startedAt)
task()
}

select {
case wp.taskQueue <- wrappedTask:
metrics.RecordTaskSubmitted()
return nil
case <-wp.done:
return fmt.Errorf("worker pool is closing")
default:
// Queue is full - fail fast
metrics.RecordTaskRejected()
return fmt.Errorf("worker pool queue is full")
}
}

// Submit submits a task to the worker pool with fail-fast behavior
// Returns error if queue is full or pool is closing
func (wp *WorkerPool) Submit(task func()) error {
Expand Down Expand Up @@ -166,5 +203,19 @@

// QueueSize returns the capacity of the task queue
func (wp *WorkerPool) QueueSize() int {
return cap(wp.taskQueue)

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
}

// QueueDepth returns the current number of tasks in the queue
func (wp *WorkerPool) QueueDepth() int {
return len(wp.taskQueue)
}

// QueueUtilization returns the percentage of queue capacity in use
func (wp *WorkerPool) QueueUtilization() float64 {
cap := cap(wp.taskQueue)
if cap == 0 {
return 0
}
return float64(len(wp.taskQueue)) / float64(cap) * 100
}
Loading
Loading