From 230ad6ff02430f550f71bbdb9e9ef5cf3ec2f84c Mon Sep 17 00:00:00 2001 From: blindchaser Date: Wed, 10 Dec 2025 16:36:22 -0500 Subject: [PATCH 1/8] feat: add worker pool metrics, backpressure, and early rejection for eth_getLogs - add comprehensive worker pool metrics (Prometheus + optional stdout debug) - add backpressure mechanism to reject requests when system is overloaded - add early rejection for pruned blocks to avoid wasting resources - add DB semaphore tracking for I/O monitoring - align DB semaphore size with worker pool size - add EVM_DEBUG_METRICS env var to enable debug output to stdout --- evmrpc/filter.go | 46 +- evmrpc/server.go | 46 +- evmrpc/subscribe.go | 8 + evmrpc/worker_pool.go | 55 ++- evmrpc/worker_pool_metrics.go | 886 ++++++++++++++++++++++++++++++++++ 5 files changed, 1033 insertions(+), 8 deletions(-) create mode 100644 evmrpc/worker_pool_metrics.go diff --git a/evmrpc/filter.go b/evmrpc/filter.go index 8dcda8a852..b7c90150f4 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -531,7 +531,8 @@ func (a *FilterAPI) GetFilterLogs( } func (a *FilterAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) (res []*ethtypes.Log, err error) { - defer recordMetricsWithError(fmt.Sprintf("%s_getLogs", a.namespace), a.connectionType, time.Now(), err) + startTime := time.Now() + defer recordMetricsWithError(fmt.Sprintf("%s_getLogs", a.namespace), a.connectionType, startTime, err) latest, err := a.logFetcher.latestHeight(ctx) if err != nil { @@ -549,16 +550,43 @@ func (a *FilterAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) (r blockRange := end - begin + 1 + // Record metrics for eth_getLogs + defer func() { + GetGlobalMetrics().RecordGetLogsRequest(blockRange, time.Since(startTime), startTime, err) + }() + // Use config value instead of hardcoded constant if blockRange > a.filterConfig.maxBlock { return nil, fmt.Errorf("block range too large (%d), maximum allowed is %d blocks", blockRange, a.filterConfig.maxBlock) } + // Early rejection for pruned blocks - avoid wasting resources on blocks that don't exist + if earliest > 0 && begin < earliest { + return nil, fmt.Errorf("requested block range [%d, %d] includes pruned blocks, earliest available block is %d", begin, end, earliest) + } + // Only apply rate limiting for large queries (> RPSLimitThreshold blocks) if blockRange > RPSLimitThreshold && !a.globalRPSLimiter.Allow() { return nil, fmt.Errorf("log query rate limit exceeded for large queries, please try again later") } + // Backpressure: early rejection based on system load + m := GetGlobalMetrics() + + // Check 1: Too many pending tasks (queue backlog) + pending := m.TasksSubmitted.Load() - m.TasksCompleted.Load() + maxPending := int64(float64(m.QueueCapacity) * 0.8) // 80% threshold + if pending > maxPending { + return nil, fmt.Errorf("server too busy, rejecting new request (pending: %d, threshold: %d)", pending, maxPending) + } + + // Check 2: I/O saturated (semaphore exhausted) + semInUse := m.DBSemaphoreAcquired.Load() + semCapacity := m.DBSemaphoreCapacity + if semCapacity > 0 && float64(semInUse)/float64(semCapacity) >= 0.8 { + return nil, fmt.Errorf("server I/O saturated, rejecting new request (semaphore: %d/%d in use)", semInUse, semCapacity) + } + logs, _, err := a.logFetcher.GetLogsByFilters(ctx, crit, 0) if err != nil { return nil, err @@ -781,7 +809,7 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr batch := blockBatch wg.Add(1) - if err := runner.Submit(func() { processBatch(batch) }); err != nil { + if err := runner.SubmitWithMetrics(func() { processBatch(batch) }); err != nil { wg.Done() submitError = fmt.Errorf("system overloaded, please reduce request frequency: %w", err) break @@ -797,7 +825,7 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr // Process remaining blocks if len(blockBatch) > 0 { wg.Add(1) - if err := runner.Submit(func() { processBatch(blockBatch) }); err != nil { + if err := runner.SubmitWithMetrics(func() { processBatch(blockBatch) }); err != nil { wg.Done() return nil, 0, fmt.Errorf("system overloaded, please reduce request frequency: %w", err) } @@ -1011,7 +1039,7 @@ func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterC } wg.Add(1) - if err := runner.Submit(func(start, endHeight int64) func() { + if err := runner.SubmitWithMetrics(func(start, endHeight int64) func() { return func() { defer wg.Done() f.processBatch(ctx, start, endHeight, crit, bloomIndexes, res, errChan) @@ -1048,6 +1076,9 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi defer func() { metrics.IncrementRpcRequestCounter("num_blocks_fetched", "blocks", true) }() + + wpMetrics := GetGlobalMetrics() + for height := start; height <= end; height++ { if height == 0 { continue @@ -1065,11 +1096,15 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi } // Block cache miss, acquire semaphore for I/O operations + semWaitStart := time.Now() f.dbReadSemaphore <- struct{}{} + wpMetrics.RecordDBSemaphoreWait(time.Since(semWaitStart)) + wpMetrics.RecordDBSemaphoreAcquire() // Re-check cache after acquiring semaphore, in case another worker cached it. if cachedEntry, found := f.globalBlockCache.Get(height); found { <-f.dbReadSemaphore + wpMetrics.RecordDBSemaphoreRelease() if cachedEntry.Block != nil { if err := f.watermarks.EnsureBlockHeightAvailable(ctx, cachedEntry.Block.Block.Height); err != nil { continue @@ -1094,6 +1129,7 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi // skip the bloom pre-filter instead of short-circuiting the block. if blockBloom != (ethtypes.Bloom{}) && !MatchFilters(blockBloom, bloomIndexes) { <-f.dbReadSemaphore + wpMetrics.RecordDBSemaphoreRelease() continue // skip the block if bloom filter does not match } } @@ -1106,6 +1142,7 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi default: } <-f.dbReadSemaphore + wpMetrics.RecordDBSemaphoreRelease() continue } @@ -1116,6 +1153,7 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi fillMissingFields(entry, block, blockBloom) } <-f.dbReadSemaphore + wpMetrics.RecordDBSemaphoreRelease() res <- block } } diff --git a/evmrpc/server.go b/evmrpc/server.go index 65ce157144..8b86a882af 100644 --- a/evmrpc/server.go +++ b/evmrpc/server.go @@ -2,8 +2,10 @@ package evmrpc import ( "context" + "runtime" "strings" "sync" + "time" "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" @@ -52,6 +54,28 @@ func NewEVMHTTPServer( // Initialize global worker pool with configuration InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize) + // Initialize global metrics with worker pool and DB semaphore configuration + workerCount := config.WorkerPoolSize + if workerCount <= 0 { + workerCount = min(evmrpcconfig.MaxWorkerPoolSize, runtime.NumCPU()*2) + } + queueSize := config.WorkerQueueSize + if queueSize <= 0 { + queueSize = evmrpcconfig.DefaultWorkerQueueSize + } + // Align DB semaphore with worker count - each worker gets one I/O slot + dbSemaphoreSize := workerCount + InitGlobalMetrics(workerCount, queueSize, dbSemaphoreSize) + + // Start metrics printer (every 5 seconds) + // Prometheus metrics are always exported; stdout printing requires EVM_DEBUG_METRICS=true + StartMetricsPrinter(5 * time.Second) + debugEnabled := IsDebugMetricsEnabled() + logger.Info("Started EVM RPC metrics exporter (interval: 5s)", "workers", workerCount, "queue", queueSize, "db_semaphore", dbSemaphoreSize, "debug_stdout", debugEnabled) + if !debugEnabled { + logger.Info("To enable debug metrics output to stdout, set EVM_DEBUG_METRICS=true") + } + // Initialize RPC tracker stats.InitRPCTracker(ctxProvider(LatestCtxHeight).Context(), logger, config.RPCStatsInterval) @@ -86,7 +110,8 @@ func NewEVMHTTPServer( seiTxAPI := NewSeiTransactionAPI(tmClient, k, ctxProvider, txConfigProvider, homeDir, ConnectionTypeHTTP, isPanicOrSyntheticTxFunc, watermarks, globalBlockCache, cacheCreationMutex) seiDebugAPI := NewSeiDebugAPI(tmClient, k, beginBlockKeepers, ctxProvider, txConfigProvider, simulateConfig, app, antehandler, ConnectionTypeHTTP, config, globalBlockCache, cacheCreationMutex, watermarks) - dbReadSemaphore := make(chan struct{}, MaxDBReadConcurrency) + // DB semaphore aligned with worker count + dbReadSemaphore := make(chan struct{}, dbSemaphoreSize) globalLogSlicePool := NewLogSlicePool() apis := []rpc.API{ { @@ -228,6 +253,22 @@ func NewEVMWebSocketServer( // Initialize global worker pool with configuration InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize) + // Initialize global metrics (idempotent - only first call takes effect) + workerCountWS := config.WorkerPoolSize + if workerCountWS <= 0 { + workerCountWS = min(evmrpcconfig.MaxWorkerPoolSize, runtime.NumCPU()*2) + } + queueSizeWS := config.WorkerQueueSize + if queueSizeWS <= 0 { + queueSizeWS = evmrpcconfig.DefaultWorkerQueueSize + } + // Align DB semaphore with worker count + dbSemaphoreSizeWS := workerCountWS + InitGlobalMetrics(workerCountWS, queueSizeWS, dbSemaphoreSizeWS) + + // Start metrics printer (idempotent - only first call starts printer) + StartMetricsPrinter(5 * time.Second) + // Initialize WebSocket tracker. stats.InitWSTracker(ctxProvider(LatestCtxHeight).Context(), logger, config.RPCStatsInterval) @@ -246,7 +287,8 @@ func NewEVMWebSocketServer( MaxConcurrentSimulationCalls: config.MaxConcurrentSimulationCalls, } watermarks := NewWatermarkManager(tmClient, ctxProvider, stateStore, k.ReceiptStore()) - dbReadSemaphore := make(chan struct{}, MaxDBReadConcurrency) + // DB semaphore aligned with worker count + dbReadSemaphore := make(chan struct{}, dbSemaphoreSizeWS) globalBlockCache := NewBlockCache(3000) cacheCreationMutex := &sync.Mutex{} globalLogSlicePool := NewLogSlicePool() diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index f938f1a872..e6cd0a5417 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -168,11 +168,17 @@ func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriter rpcSub := notifier.CreateSubscription() + // Track subscription metrics + wpMetrics := GetGlobalMetrics() + wpMetrics.RecordSubscriptionStart() + if filter.BlockHash != nil { go func() { defer recoverAndLog() + defer wpMetrics.RecordSubscriptionEnd() logs, _, err := a.logFetcher.GetLogsByFilters(ctx, *filter, 0) if err != nil { + wpMetrics.RecordSubscriptionError() _ = notifier.Notify(rpcSub.ID, err) return } @@ -187,10 +193,12 @@ func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriter go func() { defer recoverAndLog() + defer wpMetrics.RecordSubscriptionEnd() begin := int64(0) for { logs, lastToHeight, err := a.logFetcher.GetLogsByFilters(ctx, *filter, begin) if err != nil { + wpMetrics.RecordSubscriptionError() _ = notifier.Notify(rpcSub.ID, err) return } diff --git a/evmrpc/worker_pool.go b/evmrpc/worker_pool.go index b1b450fb91..c935e17604 100644 --- a/evmrpc/worker_pool.go +++ b/evmrpc/worker_pool.go @@ -4,6 +4,7 @@ import ( "fmt" "runtime" "sync" + "time" evmrpcconfig "github.com/sei-protocol/sei-chain/evmrpc/config" ) @@ -85,15 +86,16 @@ func (wp *WorkerPool) start() { } }() // The worker will exit gracefully when the taskQueue is closed and drained. - for task := range wp.taskQueue { + for wrappedTask := range wp.taskQueue { func() { defer func() { if r := recover(); r != nil { // Log the panic but continue processing other tasks fmt.Printf("Task recovered from panic: %v\n", r) + GetGlobalMetrics().RecordTaskPanicked() } }() - task() + wrappedTask() }() } }() @@ -101,6 +103,41 @@ func (wp *WorkerPool) start() { }) } +// SubmitWithMetrics submits a task with full metrics tracking +func (wp *WorkerPool) SubmitWithMetrics(task func()) error { + metrics := GetGlobalMetrics() + + // Check if pool is closed first + wp.mu.RLock() + if wp.closed { + wp.mu.RUnlock() + return fmt.Errorf("worker pool is closing") + } + wp.mu.RUnlock() + + queuedAt := time.Now() + + // Wrap the task with metrics + wrappedTask := func() { + startedAt := time.Now() + metrics.RecordTaskStarted(queuedAt) + defer metrics.RecordTaskCompleted(startedAt) + task() + } + + select { + case wp.taskQueue <- wrappedTask: + metrics.RecordTaskSubmitted() + return nil + case <-wp.done: + return fmt.Errorf("worker pool is closing") + default: + // Queue is full - fail fast + metrics.RecordTaskRejected() + return fmt.Errorf("worker pool queue is full") + } +} + // Submit submits a task to the worker pool with fail-fast behavior // Returns error if queue is full or pool is closing func (wp *WorkerPool) Submit(task func()) error { @@ -147,3 +184,17 @@ func (wp *WorkerPool) WorkerCount() int { func (wp *WorkerPool) QueueSize() int { return cap(wp.taskQueue) } + +// QueueDepth returns the current number of tasks in the queue +func (wp *WorkerPool) QueueDepth() int { + return len(wp.taskQueue) +} + +// QueueUtilization returns the percentage of queue capacity in use +func (wp *WorkerPool) QueueUtilization() float64 { + cap := cap(wp.taskQueue) + if cap == 0 { + return 0 + } + return float64(len(wp.taskQueue)) / float64(cap) * 100 +} diff --git a/evmrpc/worker_pool_metrics.go b/evmrpc/worker_pool_metrics.go new file mode 100644 index 0000000000..6a7ada986e --- /dev/null +++ b/evmrpc/worker_pool_metrics.go @@ -0,0 +1,886 @@ +package evmrpc + +import ( + "fmt" + "os" + "strings" + "sync" + "sync/atomic" + "time" + + gometrics "github.com/armon/go-metrics" + "github.com/cosmos/cosmos-sdk/telemetry" + evmrpcconfig "github.com/sei-protocol/sei-chain/evmrpc/config" +) + +// Environment variable to enable debug metrics printing to stdout +// Set EVM_DEBUG_METRICS=true to enable periodic metrics printing +const EVMDebugMetricsEnvVar = "EVM_DEBUG_METRICS" + +// IsDebugMetricsEnabled checks if debug metrics printing is enabled via environment variable +func IsDebugMetricsEnabled() bool { + val := os.Getenv(EVMDebugMetricsEnvVar) + return strings.ToLower(val) == "true" || val == "1" +} + +// Error type constants for categorization +const ( + ErrTypeRangeTooLarge = "range_too_large" + ErrTypeTimeout = "timeout" + ErrTypeRateLimited = "rate_limited" + ErrTypeBackpressure = "backpressure" + ErrTypeIOSaturated = "io_saturated" + ErrTypeBlockNotFound = "block_not_found" + ErrTypeQueueFull = "queue_full" + ErrTypeOther = "other" +) + +// WorkerPoolMetrics tracks worker pool performance metrics +type WorkerPoolMetrics struct { + // Worker pool stats + TotalWorkers int32 + ActiveWorkers atomic.Int32 + QueueCapacity int32 + QueueDepth atomic.Int32 + PeakQueueDepth atomic.Int32 + TasksSubmitted atomic.Int64 + TasksCompleted atomic.Int64 + TasksRejected atomic.Int64 // Queue full rejections + TasksPanicked atomic.Int64 + TotalWaitTimeNs atomic.Int64 // Total time tasks spent waiting in queue + TotalExecTimeNs atomic.Int64 // Total task execution time + + // DB Semaphore stats + DBSemaphoreCapacity int32 + DBSemaphoreAcquired atomic.Int32 + DBSemaphoreWaitTimeNs atomic.Int64 + DBSemaphoreWaitCount atomic.Int64 + + // eth_getLogs specific stats + GetLogsRequests atomic.Int64 + GetLogsErrors atomic.Int64 + GetLogsSuccess atomic.Int64 // Successful requests + GetLogsBlockRangeSum atomic.Int64 // Sum of block ranges for average calculation + GetLogsLatencySumNs atomic.Int64 // Sum of latencies for average calculation + GetLogsPeakRange atomic.Int64 + GetLogsMaxLatencyNs atomic.Int64 // Max latency observed + + // Error type breakdown + ErrRangeTooLarge atomic.Int64 + ErrTimeout atomic.Int64 + ErrRateLimited atomic.Int64 + ErrBackpressure atomic.Int64 + ErrIOSaturated atomic.Int64 + ErrBlockNotFound atomic.Int64 + ErrQueueFull atomic.Int64 + ErrOther atomic.Int64 + + // Block range distribution buckets (total requests) + RangeBucket1to10 atomic.Int64 // 1-10 blocks + RangeBucket11to100 atomic.Int64 // 11-100 blocks + RangeBucket101to500 atomic.Int64 // 101-500 blocks + RangeBucket501to1000 atomic.Int64 // 501-1000 blocks + RangeBucket1001to2000 atomic.Int64 // 1001-2000 blocks + RangeBucketOver2000 atomic.Int64 // >2000 blocks + + // Block range success counts (for calculating success rate per bucket) + RangeBucket1to10Success atomic.Int64 + RangeBucket11to100Success atomic.Int64 + RangeBucket101to500Success atomic.Int64 + RangeBucket501to1000Success atomic.Int64 + RangeBucket1001to2000Success atomic.Int64 + RangeBucketOver2000Success atomic.Int64 + + // Subscription stats + ActiveSubscriptions atomic.Int32 + SubscriptionErrors atomic.Int64 + + // Time window for TPS calculation + windowStart time.Time + windowRequests atomic.Int64 + mu sync.RWMutex +} + +var ( + globalMetrics *WorkerPoolMetrics + globalMetricsOnce sync.Once + metricsPrinterOnce sync.Once + metricsStopChan chan struct{} +) + +// InitGlobalMetrics initializes the global metrics instance +func InitGlobalMetrics(workerCount, queueCapacity, dbSemaphoreCapacity int) *WorkerPoolMetrics { + globalMetricsOnce.Do(func() { + globalMetrics = &WorkerPoolMetrics{ + TotalWorkers: int32(workerCount), + QueueCapacity: int32(queueCapacity), + DBSemaphoreCapacity: int32(dbSemaphoreCapacity), + windowStart: time.Now(), + } + }) + return globalMetrics +} + +// GetGlobalMetrics returns the global metrics instance +func GetGlobalMetrics() *WorkerPoolMetrics { + if globalMetrics == nil { + // Initialize with defaults if not already done + // DB semaphore is aligned with worker count + InitGlobalMetrics(evmrpcconfig.MaxWorkerPoolSize, evmrpcconfig.DefaultWorkerQueueSize, evmrpcconfig.MaxWorkerPoolSize) + } + return globalMetrics +} + +// StartMetricsPrinter starts a background goroutine that prints metrics every interval +// This is idempotent - only the first call will start the printer +// Note: Printing to stdout is controlled by the EVM_DEBUG_METRICS environment variable +// Set EVM_DEBUG_METRICS=true to enable debug output +func StartMetricsPrinter(interval time.Duration) { + metricsPrinterOnce.Do(func() { + metricsStopChan = make(chan struct{}) + debugEnabled := IsDebugMetricsEnabled() + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m := GetGlobalMetrics() + // Export to Prometheus (gauges need periodic update) + m.ExportPrometheusMetrics() + // Print to stdout only if debug is enabled + if debugEnabled { + m.PrintMetrics() + } + case <-metricsStopChan: + return + } + } + }() + }) +} + +// StopMetricsPrinter stops the metrics printer +func StopMetricsPrinter() { + if metricsStopChan != nil { + close(metricsStopChan) + } +} + +// RecordTaskSubmitted records a task submission +// Note: Prometheus export is done in batch via ExportPrometheusMetrics() +func (m *WorkerPoolMetrics) RecordTaskSubmitted() { + m.TasksSubmitted.Add(1) + depth := m.QueueDepth.Add(1) + // Update peak if needed + for { + peak := m.PeakQueueDepth.Load() + if depth <= peak || m.PeakQueueDepth.CompareAndSwap(peak, depth) { + break + } + } +} + +// RecordTaskStarted records when a task starts executing +// Note: Prometheus export is done in batch via ExportPrometheusMetrics() +func (m *WorkerPoolMetrics) RecordTaskStarted(queuedAt time.Time) { + m.ActiveWorkers.Add(1) + m.QueueDepth.Add(-1) + waitTime := time.Since(queuedAt) + m.TotalWaitTimeNs.Add(waitTime.Nanoseconds()) +} + +// RecordTaskCompleted records a task completion +// Note: Prometheus export is done in batch via ExportPrometheusMetrics() +func (m *WorkerPoolMetrics) RecordTaskCompleted(startedAt time.Time) { + m.ActiveWorkers.Add(-1) + m.TasksCompleted.Add(1) + execTime := time.Since(startedAt) + m.TotalExecTimeNs.Add(execTime.Nanoseconds()) +} + +// RecordTaskRejected records a task rejection (queue full) +// Note: Prometheus export is done in batch via ExportPrometheusMetrics() +func (m *WorkerPoolMetrics) RecordTaskRejected() { + m.TasksRejected.Add(1) +} + +// RecordTaskPanicked records a task panic +// Note: Prometheus export is done in batch via ExportPrometheusMetrics() +func (m *WorkerPoolMetrics) RecordTaskPanicked() { + m.TasksPanicked.Add(1) +} + +// RecordDBSemaphoreAcquire records acquiring the DB semaphore +func (m *WorkerPoolMetrics) RecordDBSemaphoreAcquire() { + m.DBSemaphoreAcquired.Add(1) +} + +// RecordDBSemaphoreRelease records releasing the DB semaphore +func (m *WorkerPoolMetrics) RecordDBSemaphoreRelease() { + m.DBSemaphoreAcquired.Add(-1) +} + +// RecordDBSemaphoreWait records time spent waiting for DB semaphore +// Note: Prometheus export is done in batch via ExportPrometheusMetrics() +func (m *WorkerPoolMetrics) RecordDBSemaphoreWait(waitTime time.Duration) { + m.DBSemaphoreWaitTimeNs.Add(waitTime.Nanoseconds()) + m.DBSemaphoreWaitCount.Add(1) +} + +// RecordGetLogsRequest records an eth_getLogs request with detailed error categorization +func (m *WorkerPoolMetrics) RecordGetLogsRequest(blockRange int64, latency time.Duration, startTime time.Time, err error) { + m.GetLogsRequests.Add(1) + m.windowRequests.Add(1) + m.GetLogsBlockRangeSum.Add(blockRange) + m.GetLogsLatencySumNs.Add(latency.Nanoseconds()) + + // Update max latency + latencyNs := latency.Nanoseconds() + for { + maxLat := m.GetLogsMaxLatencyNs.Load() + if latencyNs <= maxLat || m.GetLogsMaxLatencyNs.CompareAndSwap(maxLat, latencyNs) { + break + } + } + + // Update peak range + for { + peak := m.GetLogsPeakRange.Load() + if blockRange <= peak || m.GetLogsPeakRange.CompareAndSwap(peak, blockRange) { + break + } + } + + // Record block range distribution + m.recordBlockRangeBucket(blockRange) + + // Categorize errors and record success per bucket + if err != nil { + m.GetLogsErrors.Add(1) + m.categorizeError(err) + } else { + m.GetLogsSuccess.Add(1) + m.recordBlockRangeBucketSuccess(blockRange) + } + // Note: Prometheus export is done in batch via ExportPrometheusMetrics() +} + +// recordBlockRangeBucket records the block range into the appropriate bucket +func (m *WorkerPoolMetrics) recordBlockRangeBucket(blockRange int64) { + switch { + case blockRange <= 10: + m.RangeBucket1to10.Add(1) + case blockRange <= 100: + m.RangeBucket11to100.Add(1) + case blockRange <= 500: + m.RangeBucket101to500.Add(1) + case blockRange <= 1000: + m.RangeBucket501to1000.Add(1) + case blockRange <= 2000: + m.RangeBucket1001to2000.Add(1) + default: + m.RangeBucketOver2000.Add(1) + } +} + +// recordBlockRangeBucketSuccess records a successful request in the appropriate bucket +func (m *WorkerPoolMetrics) recordBlockRangeBucketSuccess(blockRange int64) { + switch { + case blockRange <= 10: + m.RangeBucket1to10Success.Add(1) + case blockRange <= 100: + m.RangeBucket11to100Success.Add(1) + case blockRange <= 500: + m.RangeBucket101to500Success.Add(1) + case blockRange <= 1000: + m.RangeBucket501to1000Success.Add(1) + case blockRange <= 2000: + m.RangeBucket1001to2000Success.Add(1) + default: + m.RangeBucketOver2000Success.Add(1) + } +} + +// categorizeError categorizes an error into specific types +func (m *WorkerPoolMetrics) categorizeError(err error) { + if err == nil { + return + } + errStr := err.Error() + + switch { + case contains(errStr, "block range too large"): + m.ErrRangeTooLarge.Add(1) + case contains(errStr, "timeout") || contains(errStr, "deadline exceeded") || contains(errStr, "request timed out"): + m.ErrTimeout.Add(1) + case contains(errStr, "rate limit"): + m.ErrRateLimited.Add(1) + case contains(errStr, "server too busy") || contains(errStr, "pending"): + m.ErrBackpressure.Add(1) + case contains(errStr, "I/O saturated") || contains(errStr, "semaphore"): + m.ErrIOSaturated.Add(1) + case contains(errStr, "block not found") || contains(errStr, "height is not available") || contains(errStr, "pruned blocks"): + m.ErrBlockNotFound.Add(1) + case contains(errStr, "queue is full") || contains(errStr, "system overloaded"): + m.ErrQueueFull.Add(1) + default: + m.ErrOther.Add(1) + } +} + +// contains is a helper function for case-insensitive substring matching +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || + len(substr) == 0 || + (len(s) > 0 && len(substr) > 0 && containsLower(toLower(s), toLower(substr)))) +} + +func toLower(s string) string { + b := make([]byte, len(s)) + for i := 0; i < len(s); i++ { + c := s[i] + if c >= 'A' && c <= 'Z' { + c += 'a' - 'A' + } + b[i] = c + } + return string(b) +} + +func containsLower(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// RecordSubscriptionStart records a new subscription +func (m *WorkerPoolMetrics) RecordSubscriptionStart() { + m.ActiveSubscriptions.Add(1) +} + +// RecordSubscriptionEnd records subscription end +func (m *WorkerPoolMetrics) RecordSubscriptionEnd() { + m.ActiveSubscriptions.Add(-1) +} + +// RecordSubscriptionError records a subscription error +func (m *WorkerPoolMetrics) RecordSubscriptionError() { + m.SubscriptionErrors.Add(1) + // Export to Prometheus + IncrPrometheusSubscriptionError() +} + +// GetTPS calculates the current TPS based on time window +func (m *WorkerPoolMetrics) GetTPS() float64 { + m.mu.RLock() + windowStart := m.windowStart + requests := m.windowRequests.Load() + m.mu.RUnlock() + + elapsed := time.Since(windowStart).Seconds() + if elapsed <= 0 { + return 0 + } + return float64(requests) / elapsed +} + +// ResetTPSWindow resets the TPS calculation window +func (m *WorkerPoolMetrics) ResetTPSWindow() { + m.mu.Lock() + m.windowStart = time.Now() + m.windowRequests.Store(0) + m.mu.Unlock() +} + +// GetAverageQueueWaitTime returns average time tasks spend waiting in queue +func (m *WorkerPoolMetrics) GetAverageQueueWaitTime() time.Duration { + completed := m.TasksCompleted.Load() + if completed == 0 { + return 0 + } + return time.Duration(m.TotalWaitTimeNs.Load() / completed) +} + +// GetAverageExecTime returns average task execution time +func (m *WorkerPoolMetrics) GetAverageExecTime() time.Duration { + completed := m.TasksCompleted.Load() + if completed == 0 { + return 0 + } + return time.Duration(m.TotalExecTimeNs.Load() / completed) +} + +// GetAverageDBWaitTime returns average DB semaphore wait time +func (m *WorkerPoolMetrics) GetAverageDBWaitTime() time.Duration { + count := m.DBSemaphoreWaitCount.Load() + if count == 0 { + return 0 + } + return time.Duration(m.DBSemaphoreWaitTimeNs.Load() / count) +} + +// GetAverageBlockRange returns average block range for eth_getLogs +func (m *WorkerPoolMetrics) GetAverageBlockRange() float64 { + requests := m.GetLogsRequests.Load() + if requests == 0 { + return 0 + } + return float64(m.GetLogsBlockRangeSum.Load()) / float64(requests) +} + +// GetAverageLatency returns average eth_getLogs latency +func (m *WorkerPoolMetrics) GetAverageLatency() time.Duration { + requests := m.GetLogsRequests.Load() + if requests == 0 { + return 0 + } + return time.Duration(m.GetLogsLatencySumNs.Load() / requests) +} + +// GetSnapshot returns a snapshot of current metrics +func (m *WorkerPoolMetrics) GetSnapshot() MetricsSnapshot { + return MetricsSnapshot{ + Timestamp: time.Now(), + + // Worker pool + TotalWorkers: m.TotalWorkers, + ActiveWorkers: m.ActiveWorkers.Load(), + IdleWorkers: m.TotalWorkers - m.ActiveWorkers.Load(), + QueueCapacity: m.QueueCapacity, + QueueDepth: m.QueueDepth.Load(), + QueueUtilization: float64(m.QueueDepth.Load()) / float64(m.QueueCapacity) * 100, + PeakQueueDepth: m.PeakQueueDepth.Load(), + TasksSubmitted: m.TasksSubmitted.Load(), + TasksCompleted: m.TasksCompleted.Load(), + TasksRejected: m.TasksRejected.Load(), + TasksPending: m.TasksSubmitted.Load() - m.TasksCompleted.Load() - m.TasksRejected.Load(), + AvgQueueWaitTime: m.GetAverageQueueWaitTime(), + AvgExecTime: m.GetAverageExecTime(), + + // DB Semaphore + DBSemaphoreCapacity: m.DBSemaphoreCapacity, + DBSemaphoreInUse: m.DBSemaphoreAcquired.Load(), + DBSemaphoreAvail: m.DBSemaphoreCapacity - m.DBSemaphoreAcquired.Load(), + AvgDBWaitTime: m.GetAverageDBWaitTime(), + + // eth_getLogs + GetLogsTPS: m.GetTPS(), + GetLogsTotal: m.GetLogsRequests.Load(), + GetLogsSuccess: m.GetLogsSuccess.Load(), + GetLogsErrors: m.GetLogsErrors.Load(), + GetLogsErrorRate: float64(m.GetLogsErrors.Load()) / float64(max(m.GetLogsRequests.Load(), 1)) * 100, + AvgBlockRange: m.GetAverageBlockRange(), + PeakBlockRange: m.GetLogsPeakRange.Load(), + AvgLatency: m.GetAverageLatency(), + MaxLatency: time.Duration(m.GetLogsMaxLatencyNs.Load()), + + // Error type breakdown + ErrRangeTooLarge: m.ErrRangeTooLarge.Load(), + ErrTimeout: m.ErrTimeout.Load(), + ErrRateLimited: m.ErrRateLimited.Load(), + ErrBackpressure: m.ErrBackpressure.Load(), + ErrIOSaturated: m.ErrIOSaturated.Load(), + ErrBlockNotFound: m.ErrBlockNotFound.Load(), + ErrQueueFull: m.ErrQueueFull.Load(), + ErrOther: m.ErrOther.Load(), + + // Block range distribution + RangeBucket1to10: m.RangeBucket1to10.Load(), + RangeBucket1to10Success: m.RangeBucket1to10Success.Load(), + RangeBucket11to100: m.RangeBucket11to100.Load(), + RangeBucket11to100Success: m.RangeBucket11to100Success.Load(), + RangeBucket101to500: m.RangeBucket101to500.Load(), + RangeBucket101to500Success: m.RangeBucket101to500Success.Load(), + RangeBucket501to1000: m.RangeBucket501to1000.Load(), + RangeBucket501to1000Success: m.RangeBucket501to1000Success.Load(), + RangeBucket1001to2000: m.RangeBucket1001to2000.Load(), + RangeBucket1001to2000Success: m.RangeBucket1001to2000Success.Load(), + RangeBucketOver2000: m.RangeBucketOver2000.Load(), + RangeBucketOver2000Success: m.RangeBucketOver2000Success.Load(), + + // Subscriptions + ActiveSubscriptions: m.ActiveSubscriptions.Load(), + SubscriptionErrors: m.SubscriptionErrors.Load(), + } +} + +// MetricsSnapshot represents a point-in-time snapshot of metrics +type MetricsSnapshot struct { + Timestamp time.Time + + // Worker pool + TotalWorkers int32 + ActiveWorkers int32 + IdleWorkers int32 + QueueCapacity int32 + QueueDepth int32 + QueueUtilization float64 + PeakQueueDepth int32 + TasksSubmitted int64 + TasksCompleted int64 + TasksRejected int64 + TasksPending int64 + AvgQueueWaitTime time.Duration + AvgExecTime time.Duration + + // DB Semaphore + DBSemaphoreCapacity int32 + DBSemaphoreInUse int32 + DBSemaphoreAvail int32 + AvgDBWaitTime time.Duration + + // eth_getLogs + GetLogsTPS float64 + GetLogsTotal int64 + GetLogsSuccess int64 + GetLogsErrors int64 + GetLogsErrorRate float64 + AvgBlockRange float64 + PeakBlockRange int64 + AvgLatency time.Duration + MaxLatency time.Duration + + // Error type breakdown + ErrRangeTooLarge int64 + ErrTimeout int64 + ErrRateLimited int64 + ErrBackpressure int64 + ErrIOSaturated int64 + ErrBlockNotFound int64 + ErrQueueFull int64 + ErrOther int64 + + // Block range distribution (total and success for calculating success rate) + RangeBucket1to10 int64 + RangeBucket1to10Success int64 + RangeBucket11to100 int64 + RangeBucket11to100Success int64 + RangeBucket101to500 int64 + RangeBucket101to500Success int64 + RangeBucket501to1000 int64 + RangeBucket501to1000Success int64 + RangeBucket1001to2000 int64 + RangeBucket1001to2000Success int64 + RangeBucketOver2000 int64 + RangeBucketOver2000Success int64 + + // Subscriptions + ActiveSubscriptions int32 + SubscriptionErrors int64 +} + +// PrintMetrics prints current metrics to stdout +func (m *WorkerPoolMetrics) PrintMetrics() { + s := m.GetSnapshot() + + fmt.Println("\n" + "=" + repeatStr("=", 79)) + fmt.Printf(" EVM RPC METRICS SNAPSHOT - %s\n", s.Timestamp.Format("2006-01-02 15:04:05.000")) + fmt.Println("=" + repeatStr("=", 79)) + + // Worker Pool Section + fmt.Println("\n┌─ WORKER POOL " + repeatStr("─", 65)) + fmt.Printf("│ Workers: %d total | %d active | %d idle\n", + s.TotalWorkers, s.ActiveWorkers, s.IdleWorkers) + fmt.Printf("│ Queue: %d/%d (%.1f%% full) | Peak: %d\n", + s.QueueDepth, s.QueueCapacity, s.QueueUtilization, s.PeakQueueDepth) + fmt.Printf("│ Tasks: %d submitted | %d completed | %d rejected | %d pending\n", + s.TasksSubmitted, s.TasksCompleted, s.TasksRejected, s.TasksPending) + fmt.Printf("│ Timing: Avg queue wait: %v | Avg exec: %v\n", + s.AvgQueueWaitTime.Round(time.Microsecond), s.AvgExecTime.Round(time.Microsecond)) + + // DB Semaphore Section + fmt.Println("├─ DB SEMAPHORE " + repeatStr("─", 63)) + fmt.Printf("│ Capacity: %d total | %d in-use | %d available\n", + s.DBSemaphoreCapacity, s.DBSemaphoreInUse, s.DBSemaphoreAvail) + fmt.Printf("│ Wait Time: Avg: %v\n", s.AvgDBWaitTime.Round(time.Microsecond)) + + // eth_getLogs Section + fmt.Println("├─ eth_getLogs " + repeatStr("─", 64)) + fmt.Printf("│ Requests: %d total | %d success | %d errors (%.1f%% error rate)\n", + s.GetLogsTotal, s.GetLogsSuccess, s.GetLogsErrors, s.GetLogsErrorRate) + fmt.Printf("│ TPS: %.2f req/s\n", s.GetLogsTPS) + fmt.Printf("│ Block Range: Avg: %.1f | Peak: %d\n", + s.AvgBlockRange, s.PeakBlockRange) + fmt.Printf("│ Latency: Avg: %v | Max: %v\n", + s.AvgLatency.Round(time.Millisecond), s.MaxLatency.Round(time.Millisecond)) + + // Error Breakdown Section + if s.GetLogsErrors > 0 { + fmt.Println("├─ ERROR BREAKDOWN " + repeatStr("─", 60)) + fmt.Printf("│ Range Too Large: %d\n", s.ErrRangeTooLarge) + fmt.Printf("│ Timeout: %d\n", s.ErrTimeout) + fmt.Printf("│ Rate Limited: %d\n", s.ErrRateLimited) + fmt.Printf("│ Backpressure: %d\n", s.ErrBackpressure) + fmt.Printf("│ I/O Saturated: %d\n", s.ErrIOSaturated) + fmt.Printf("│ Block Not Found: %d\n", s.ErrBlockNotFound) + fmt.Printf("│ Queue Full: %d\n", s.ErrQueueFull) + fmt.Printf("│ Other: %d\n", s.ErrOther) + } + + // Block Range Distribution Section with success rate + totalRangeRequests := s.RangeBucket1to10 + s.RangeBucket11to100 + s.RangeBucket101to500 + + s.RangeBucket501to1000 + s.RangeBucket1001to2000 + s.RangeBucketOver2000 + if totalRangeRequests > 0 { + fmt.Println("├─ BLOCK RANGE DISTRIBUTION " + repeatStr("─", 51)) + fmt.Println("│ Range Total Dist%% Success Rate") + fmt.Println("│ " + repeatStr("─", 50)) + printRangeBucket("1-10 blocks", s.RangeBucket1to10, s.RangeBucket1to10Success, totalRangeRequests) + printRangeBucket("11-100 blocks", s.RangeBucket11to100, s.RangeBucket11to100Success, totalRangeRequests) + printRangeBucket("101-500 blocks", s.RangeBucket101to500, s.RangeBucket101to500Success, totalRangeRequests) + printRangeBucket("501-1000 blocks", s.RangeBucket501to1000, s.RangeBucket501to1000Success, totalRangeRequests) + printRangeBucket("1001-2000 blocks", s.RangeBucket1001to2000, s.RangeBucket1001to2000Success, totalRangeRequests) + printRangeBucket(">2000 blocks", s.RangeBucketOver2000, s.RangeBucketOver2000Success, totalRangeRequests) + } + + // Subscriptions Section + fmt.Println("├─ SUBSCRIPTIONS " + repeatStr("─", 62)) + fmt.Printf("│ Active: %d | Errors: %d\n", + s.ActiveSubscriptions, s.SubscriptionErrors) + + fmt.Println("└" + repeatStr("─", 79)) + + // Alert conditions + if s.QueueUtilization > 80 { + fmt.Printf("⚠️ WARNING: Queue utilization at %.1f%% - approaching saturation!\n", s.QueueUtilization) + } + if s.DBSemaphoreAvail == 0 { + fmt.Println("⚠️ WARNING: DB Semaphore exhausted - all slots in use!") + } + if s.ErrTimeout > 0 && float64(s.ErrTimeout)/float64(max(s.GetLogsTotal, 1)) > 0.1 { + fmt.Printf("⚠️ WARNING: High timeout rate (%.1f%%) - consider reducing max_blocks_for_log\n", + float64(s.ErrTimeout)/float64(s.GetLogsTotal)*100) + } + if s.RangeBucketOver2000 > 0 { + fmt.Printf("⚠️ WARNING: %d requests exceeded 2000 block limit\n", s.RangeBucketOver2000) + } +} + +// printRangeBucket prints a single range bucket with success rate +func printRangeBucket(name string, total, success, totalRequests int64) { + if total == 0 { + fmt.Printf("│ %-16s %6d %5.1f%% %6d %5.1f%%\n", + name, total, 0.0, success, 0.0) + } else { + successRate := float64(success) / float64(total) * 100 + distPct := float64(total) / float64(totalRequests) * 100 + fmt.Printf("│ %-16s %6d %5.1f%% %6d %5.1f%%\n", + name, total, distPct, success, successRate) + } +} + +func repeatStr(s string, count int) string { + result := "" + for i := 0; i < count; i++ { + result += s + } + return result +} + +// ResetMetrics resets all metrics (useful for testing) +func (m *WorkerPoolMetrics) ResetMetrics() { + m.ActiveWorkers.Store(0) + m.QueueDepth.Store(0) + m.PeakQueueDepth.Store(0) + m.TasksSubmitted.Store(0) + m.TasksCompleted.Store(0) + m.TasksRejected.Store(0) + m.TasksPanicked.Store(0) + m.TotalWaitTimeNs.Store(0) + m.TotalExecTimeNs.Store(0) + m.DBSemaphoreAcquired.Store(0) + m.DBSemaphoreWaitTimeNs.Store(0) + m.DBSemaphoreWaitCount.Store(0) + m.GetLogsRequests.Store(0) + m.GetLogsErrors.Store(0) + m.GetLogsSuccess.Store(0) + m.GetLogsBlockRangeSum.Store(0) + m.GetLogsLatencySumNs.Store(0) + m.GetLogsPeakRange.Store(0) + m.GetLogsMaxLatencyNs.Store(0) + // Reset error breakdown + m.ErrRangeTooLarge.Store(0) + m.ErrTimeout.Store(0) + m.ErrRateLimited.Store(0) + m.ErrBackpressure.Store(0) + m.ErrIOSaturated.Store(0) + m.ErrBlockNotFound.Store(0) + m.ErrQueueFull.Store(0) + m.ErrOther.Store(0) + // Reset block range buckets + m.RangeBucket1to10.Store(0) + m.RangeBucket11to100.Store(0) + m.RangeBucket101to500.Store(0) + m.RangeBucket501to1000.Store(0) + m.RangeBucket1001to2000.Store(0) + m.RangeBucketOver2000.Store(0) + // Reset block range success buckets + m.RangeBucket1to10Success.Store(0) + m.RangeBucket11to100Success.Store(0) + m.RangeBucket101to500Success.Store(0) + m.RangeBucket501to1000Success.Store(0) + m.RangeBucket1001to2000Success.Store(0) + m.RangeBucketOver2000Success.Store(0) + // Reset subscriptions + m.ActiveSubscriptions.Store(0) + m.SubscriptionErrors.Store(0) + m.ResetTPSWindow() +} + +// ======================================== +// Prometheus Metrics Export Functions +// ======================================== + +// ExportPrometheusMetrics exports all metrics to Prometheus +// This should be called periodically (e.g., every 5 seconds) +// All metrics are exported as gauges for efficiency (batch export instead of per-operation) +func (m *WorkerPoolMetrics) ExportPrometheusMetrics() { + // Worker Pool Gauges + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "total"}, float32(m.TotalWorkers)) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "active"}, float32(m.ActiveWorkers.Load())) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "idle"}, float32(m.TotalWorkers-m.ActiveWorkers.Load())) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "capacity"}, float32(m.QueueCapacity)) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "depth"}, float32(m.QueueDepth.Load())) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "peak"}, float32(m.PeakQueueDepth.Load())) + + // Queue utilization percentage + utilization := float32(0) + if m.QueueCapacity > 0 { + utilization = float32(m.QueueDepth.Load()) / float32(m.QueueCapacity) * 100 + } + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "utilization"}, utilization) + + // Task counters (exported as gauges for batch efficiency) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "submitted", "total"}, float32(m.TasksSubmitted.Load())) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "completed", "total"}, float32(m.TasksCompleted.Load())) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "rejected", "total"}, float32(m.TasksRejected.Load())) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "panicked", "total"}, float32(m.TasksPanicked.Load())) + + // DB Semaphore Gauges + gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "capacity"}, float32(m.DBSemaphoreCapacity)) + gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "inuse"}, float32(m.DBSemaphoreAcquired.Load())) + gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "available"}, float32(m.DBSemaphoreCapacity-m.DBSemaphoreAcquired.Load())) + gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "wait", "count"}, float32(m.DBSemaphoreWaitCount.Load())) + + // Subscriptions Gauge + gometrics.SetGauge([]string{"sei", "evm", "subscriptions", "active"}, float32(m.ActiveSubscriptions.Load())) + + // eth_getLogs specific gauges + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "requests", "total"}, float32(m.GetLogsRequests.Load())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "success", "total"}, float32(m.GetLogsSuccess.Load())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "errors", "total"}, float32(m.GetLogsErrors.Load())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "tps"}, float32(m.GetTPS())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "avg", "blockrange"}, float32(m.GetAverageBlockRange())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "peak", "blockrange"}, float32(m.GetLogsPeakRange.Load())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "avg", "latency", "ms"}, float32(m.GetAverageLatency().Milliseconds())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "max", "latency", "ms"}, float32(m.GetLogsMaxLatencyNs.Load()/1e6)) + + // Error breakdown + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "errors", "range_too_large"}, float32(m.ErrRangeTooLarge.Load())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "errors", "rate_limited"}, float32(m.ErrRateLimited.Load())) + gometrics.SetGauge([]string{"sei", "evm", "getlogs", "errors", "backpressure"}, float32(m.ErrBackpressure.Load())) + + // Average timings + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "avg", "queue", "wait", "ms"}, float32(m.GetAverageQueueWaitTime().Milliseconds())) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "avg", "exec", "time", "ms"}, float32(m.GetAverageExecTime().Milliseconds())) + gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "avg", "wait", "ms"}, float32(m.GetAverageDBWaitTime().Milliseconds())) +} + +// IncrTaskSubmitted increments task submitted counter in Prometheus +func IncrPrometheusTaskSubmitted() { + telemetry.IncrCounter(1, "sei", "evm", "workerpool", "tasks", "submitted") +} + +// IncrTaskCompleted increments task completed counter in Prometheus +func IncrPrometheusTaskCompleted() { + telemetry.IncrCounter(1, "sei", "evm", "workerpool", "tasks", "completed") +} + +// IncrTaskRejected increments task rejected counter in Prometheus +func IncrPrometheusTaskRejected() { + telemetry.IncrCounterWithLabels( + []string{"sei", "evm", "workerpool", "tasks", "rejected"}, + 1, + []gometrics.Label{telemetry.NewLabel("reason", "queue_full")}, + ) +} + +// IncrTaskPanicked increments task panicked counter in Prometheus +func IncrPrometheusTaskPanicked() { + telemetry.IncrCounter(1, "sei", "evm", "workerpool", "tasks", "panicked") +} + +// IncrGetLogsRequest increments eth_getLogs request counter with labels +func IncrPrometheusGetLogsRequest(success bool, blockRange int64) { + rangeLabel := "small" + if blockRange > 100 { + rangeLabel = "medium" + } + if blockRange > 1000 { + rangeLabel = "large" + } + + successStr := "true" + if !success { + successStr = "false" + } + + telemetry.IncrCounterWithLabels( + []string{"sei", "evm", "getlogs", "requests"}, + 1, + []gometrics.Label{ + telemetry.NewLabel("success", successStr), + telemetry.NewLabel("range", rangeLabel), + }, + ) +} + +// MeasureGetLogsLatency records eth_getLogs latency +func MeasurePrometheusGetLogsLatency(startTime time.Time, blockRange int64) { + rangeLabel := "small" + if blockRange > 100 { + rangeLabel = "medium" + } + if blockRange > 1000 { + rangeLabel = "large" + } + + gometrics.MeasureSinceWithLabels( + []string{"sei", "evm", "getlogs", "latency", "ms"}, + startTime.UTC(), + []gometrics.Label{telemetry.NewLabel("range", rangeLabel)}, + ) +} + +// IncrSubscriptionError increments subscription error counter +func IncrPrometheusSubscriptionError() { + telemetry.IncrCounter(1, "sei", "evm", "subscriptions", "errors") +} + +// RecordDBSemaphoreWaitTime records DB semaphore wait time histogram +func RecordPrometheusDBSemaphoreWait(waitTime time.Duration) { + gometrics.AddSample( + []string{"sei", "evm", "db", "semaphore", "wait", "ms"}, + float32(waitTime.Milliseconds()), + ) +} + +// RecordQueueWaitTime records queue wait time histogram +func RecordPrometheusQueueWait(waitTime time.Duration) { + gometrics.AddSample( + []string{"sei", "evm", "workerpool", "queue", "wait", "ms"}, + float32(waitTime.Milliseconds()), + ) +} + +// RecordTaskExecTime records task execution time histogram +func RecordPrometheusTaskExec(execTime time.Duration) { + gometrics.AddSample( + []string{"sei", "evm", "workerpool", "task", "exec", "ms"}, + float32(execTime.Milliseconds()), + ) +} From 31a9710e9e0885bb411b940995453570dc8d3387 Mon Sep 17 00:00:00 2001 From: blindchaser Date: Wed, 10 Dec 2025 17:14:45 -0500 Subject: [PATCH 2/8] fix lint --- evmrpc/worker_pool_metrics.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/evmrpc/worker_pool_metrics.go b/evmrpc/worker_pool_metrics.go index 6a7ada986e..b2b3decc2b 100644 --- a/evmrpc/worker_pool_metrics.go +++ b/evmrpc/worker_pool_metrics.go @@ -112,9 +112,9 @@ var ( func InitGlobalMetrics(workerCount, queueCapacity, dbSemaphoreCapacity int) *WorkerPoolMetrics { globalMetricsOnce.Do(func() { globalMetrics = &WorkerPoolMetrics{ - TotalWorkers: int32(workerCount), - QueueCapacity: int32(queueCapacity), - DBSemaphoreCapacity: int32(dbSemaphoreCapacity), + TotalWorkers: int32(workerCount), //nolint:gosec // G115: safe, max is 64 + QueueCapacity: int32(queueCapacity), //nolint:gosec // G115: safe, max is 1000 + DBSemaphoreCapacity: int32(dbSemaphoreCapacity), //nolint:gosec // G115: safe, max is 64 windowStart: time.Now(), } }) From edbf0e3848412bcce78c4703f75aafa36c2ec872 Mon Sep 17 00:00:00 2001 From: blindchaser Date: Thu, 11 Dec 2025 12:34:03 -0500 Subject: [PATCH 3/8] refactor worker pool metrics --- evmrpc/server.go | 44 +++++++++++------------------------ evmrpc/worker_pool.go | 20 ++++++++++------ evmrpc/worker_pool_metrics.go | 26 +++------------------ 3 files changed, 29 insertions(+), 61 deletions(-) diff --git a/evmrpc/server.go b/evmrpc/server.go index 8b86a882af..28c30dcdee 100644 --- a/evmrpc/server.go +++ b/evmrpc/server.go @@ -2,7 +2,6 @@ package evmrpc import ( "context" - "runtime" "strings" "sync" "time" @@ -51,27 +50,22 @@ func NewEVMHTTPServer( ) (EVMServer, error) { logger = logger.With("module", "evmrpc") - // Initialize global worker pool with configuration + // Initialize global worker pool with configuration (metrics are embedded in pool) InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize) - // Initialize global metrics with worker pool and DB semaphore configuration - workerCount := config.WorkerPoolSize - if workerCount <= 0 { - workerCount = min(evmrpcconfig.MaxWorkerPoolSize, runtime.NumCPU()*2) - } - queueSize := config.WorkerQueueSize - if queueSize <= 0 { - queueSize = evmrpcconfig.DefaultWorkerQueueSize - } - // Align DB semaphore with worker count - each worker gets one I/O slot - dbSemaphoreSize := workerCount - InitGlobalMetrics(workerCount, queueSize, dbSemaphoreSize) + // Get pool for logging and DB semaphore setup + pool := GetGlobalWorkerPool() + workerCount := pool.WorkerCount() + queueSize := pool.QueueSize() + + // Set DB semaphore capacity in metrics (aligned with worker count) + pool.Metrics.DBSemaphoreCapacity = int32(workerCount) //nolint:gosec // G115: safe, max is 64 // Start metrics printer (every 5 seconds) // Prometheus metrics are always exported; stdout printing requires EVM_DEBUG_METRICS=true StartMetricsPrinter(5 * time.Second) debugEnabled := IsDebugMetricsEnabled() - logger.Info("Started EVM RPC metrics exporter (interval: 5s)", "workers", workerCount, "queue", queueSize, "db_semaphore", dbSemaphoreSize, "debug_stdout", debugEnabled) + logger.Info("Started EVM RPC metrics exporter (interval: 5s)", "workers", workerCount, "queue", queueSize, "db_semaphore", workerCount, "debug_stdout", debugEnabled) if !debugEnabled { logger.Info("To enable debug metrics output to stdout, set EVM_DEBUG_METRICS=true") } @@ -111,7 +105,7 @@ func NewEVMHTTPServer( seiDebugAPI := NewSeiDebugAPI(tmClient, k, beginBlockKeepers, ctxProvider, txConfigProvider, simulateConfig, app, antehandler, ConnectionTypeHTTP, config, globalBlockCache, cacheCreationMutex, watermarks) // DB semaphore aligned with worker count - dbReadSemaphore := make(chan struct{}, dbSemaphoreSize) + dbReadSemaphore := make(chan struct{}, workerCount) globalLogSlicePool := NewLogSlicePool() apis := []rpc.API{ { @@ -250,22 +244,10 @@ func NewEVMWebSocketServer( ) (EVMServer, error) { logger = logger.With("module", "evmrpc") - // Initialize global worker pool with configuration + // Initialize global worker pool with configuration (metrics are embedded in pool) + // This is idempotent - if HTTP server already initialized it, this is a no-op InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize) - // Initialize global metrics (idempotent - only first call takes effect) - workerCountWS := config.WorkerPoolSize - if workerCountWS <= 0 { - workerCountWS = min(evmrpcconfig.MaxWorkerPoolSize, runtime.NumCPU()*2) - } - queueSizeWS := config.WorkerQueueSize - if queueSizeWS <= 0 { - queueSizeWS = evmrpcconfig.DefaultWorkerQueueSize - } - // Align DB semaphore with worker count - dbSemaphoreSizeWS := workerCountWS - InitGlobalMetrics(workerCountWS, queueSizeWS, dbSemaphoreSizeWS) - // Start metrics printer (idempotent - only first call starts printer) StartMetricsPrinter(5 * time.Second) @@ -288,7 +270,7 @@ func NewEVMWebSocketServer( } watermarks := NewWatermarkManager(tmClient, ctxProvider, stateStore, k.ReceiptStore()) // DB semaphore aligned with worker count - dbReadSemaphore := make(chan struct{}, dbSemaphoreSizeWS) + dbReadSemaphore := make(chan struct{}, GetGlobalWorkerPool().WorkerCount()) globalBlockCache := NewBlockCache(3000) cacheCreationMutex := &sync.Mutex{} globalLogSlicePool := NewLogSlicePool() diff --git a/evmrpc/worker_pool.go b/evmrpc/worker_pool.go index c935e17604..fa59fbce90 100644 --- a/evmrpc/worker_pool.go +++ b/evmrpc/worker_pool.go @@ -18,6 +18,9 @@ type WorkerPool struct { wg sync.WaitGroup closed bool mu sync.RWMutex + + // Embedded metrics for backpressure and observability + Metrics *WorkerPoolMetrics } var ( @@ -65,6 +68,11 @@ func NewWorkerPool(workers, queueSize int) *WorkerPool { workers: workers, taskQueue: make(chan func(), queueSize), done: make(chan struct{}), + Metrics: &WorkerPoolMetrics{ + TotalWorkers: int32(workers), //nolint:gosec // G115: safe, max is 64 + QueueCapacity: int32(queueSize), //nolint:gosec // G115: safe, max is 1000 + windowStart: time.Now(), + }, } } @@ -92,7 +100,7 @@ func (wp *WorkerPool) start() { if r := recover(); r != nil { // Log the panic but continue processing other tasks fmt.Printf("Task recovered from panic: %v\n", r) - GetGlobalMetrics().RecordTaskPanicked() + wp.Metrics.RecordTaskPanicked() } }() wrappedTask() @@ -105,8 +113,6 @@ func (wp *WorkerPool) start() { // SubmitWithMetrics submits a task with full metrics tracking func (wp *WorkerPool) SubmitWithMetrics(task func()) error { - metrics := GetGlobalMetrics() - // Check if pool is closed first wp.mu.RLock() if wp.closed { @@ -120,20 +126,20 @@ func (wp *WorkerPool) SubmitWithMetrics(task func()) error { // Wrap the task with metrics wrappedTask := func() { startedAt := time.Now() - metrics.RecordTaskStarted(queuedAt) - defer metrics.RecordTaskCompleted(startedAt) + wp.Metrics.RecordTaskStarted(queuedAt) + defer wp.Metrics.RecordTaskCompleted(startedAt) task() } select { case wp.taskQueue <- wrappedTask: - metrics.RecordTaskSubmitted() + wp.Metrics.RecordTaskSubmitted() return nil case <-wp.done: return fmt.Errorf("worker pool is closing") default: // Queue is full - fail fast - metrics.RecordTaskRejected() + wp.Metrics.RecordTaskRejected() return fmt.Errorf("worker pool queue is full") } } diff --git a/evmrpc/worker_pool_metrics.go b/evmrpc/worker_pool_metrics.go index b2b3decc2b..c18e2d404e 100644 --- a/evmrpc/worker_pool_metrics.go +++ b/evmrpc/worker_pool_metrics.go @@ -10,7 +10,6 @@ import ( gometrics "github.com/armon/go-metrics" "github.com/cosmos/cosmos-sdk/telemetry" - evmrpcconfig "github.com/sei-protocol/sei-chain/evmrpc/config" ) // Environment variable to enable debug metrics printing to stdout @@ -102,33 +101,14 @@ type WorkerPoolMetrics struct { } var ( - globalMetrics *WorkerPoolMetrics - globalMetricsOnce sync.Once metricsPrinterOnce sync.Once metricsStopChan chan struct{} ) -// InitGlobalMetrics initializes the global metrics instance -func InitGlobalMetrics(workerCount, queueCapacity, dbSemaphoreCapacity int) *WorkerPoolMetrics { - globalMetricsOnce.Do(func() { - globalMetrics = &WorkerPoolMetrics{ - TotalWorkers: int32(workerCount), //nolint:gosec // G115: safe, max is 64 - QueueCapacity: int32(queueCapacity), //nolint:gosec // G115: safe, max is 1000 - DBSemaphoreCapacity: int32(dbSemaphoreCapacity), //nolint:gosec // G115: safe, max is 64 - windowStart: time.Now(), - } - }) - return globalMetrics -} - -// GetGlobalMetrics returns the global metrics instance +// GetGlobalMetrics returns the metrics from the global worker pool +// This is a convenience function for accessing metrics without importing worker pool func GetGlobalMetrics() *WorkerPoolMetrics { - if globalMetrics == nil { - // Initialize with defaults if not already done - // DB semaphore is aligned with worker count - InitGlobalMetrics(evmrpcconfig.MaxWorkerPoolSize, evmrpcconfig.DefaultWorkerQueueSize, evmrpcconfig.MaxWorkerPoolSize) - } - return globalMetrics + return GetGlobalWorkerPool().Metrics } // StartMetricsPrinter starts a background goroutine that prints metrics every interval From 95eeb33bed4c6e33319a6869bdd2c7f575b7e9bf Mon Sep 17 00:00:00 2001 From: blindchaser Date: Thu, 11 Dec 2025 16:25:56 -0500 Subject: [PATCH 4/8] fix metrics concurrent --- evmrpc/filter.go | 4 ++-- evmrpc/server.go | 3 ++- evmrpc/worker_pool.go | 9 ++++---- evmrpc/worker_pool_metrics.go | 40 +++++++++++++++++++++-------------- 4 files changed, 33 insertions(+), 23 deletions(-) diff --git a/evmrpc/filter.go b/evmrpc/filter.go index b7c90150f4..9daa35a2a2 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -575,14 +575,14 @@ func (a *FilterAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) (r // Check 1: Too many pending tasks (queue backlog) pending := m.TasksSubmitted.Load() - m.TasksCompleted.Load() - maxPending := int64(float64(m.QueueCapacity) * 0.8) // 80% threshold + maxPending := int64(float64(m.QueueCapacity.Load()) * 0.8) // 80% threshold if pending > maxPending { return nil, fmt.Errorf("server too busy, rejecting new request (pending: %d, threshold: %d)", pending, maxPending) } // Check 2: I/O saturated (semaphore exhausted) semInUse := m.DBSemaphoreAcquired.Load() - semCapacity := m.DBSemaphoreCapacity + semCapacity := m.DBSemaphoreCapacity.Load() if semCapacity > 0 && float64(semInUse)/float64(semCapacity) >= 0.8 { return nil, fmt.Errorf("server I/O saturated, rejecting new request (semaphore: %d/%d in use)", semInUse, semCapacity) } diff --git a/evmrpc/server.go b/evmrpc/server.go index 28c30dcdee..4f7fdfa3f3 100644 --- a/evmrpc/server.go +++ b/evmrpc/server.go @@ -59,7 +59,8 @@ func NewEVMHTTPServer( queueSize := pool.QueueSize() // Set DB semaphore capacity in metrics (aligned with worker count) - pool.Metrics.DBSemaphoreCapacity = int32(workerCount) //nolint:gosec // G115: safe, max is 64 + // Only set once to avoid races when multiple test servers start in parallel. + pool.Metrics.DBSemaphoreCapacity.CompareAndSwap(0, int32(workerCount)) //nolint:gosec // G115: safe, max is 64 // Start metrics printer (every 5 seconds) // Prometheus metrics are always exported; stdout printing requires EVM_DEBUG_METRICS=true diff --git a/evmrpc/worker_pool.go b/evmrpc/worker_pool.go index fa59fbce90..1d71537569 100644 --- a/evmrpc/worker_pool.go +++ b/evmrpc/worker_pool.go @@ -64,16 +64,17 @@ func NewWorkerPool(workers, queueSize int) *WorkerPool { queueSize = evmrpcconfig.DefaultWorkerQueueSize } - return &WorkerPool{ + wp := &WorkerPool{ workers: workers, taskQueue: make(chan func(), queueSize), done: make(chan struct{}), Metrics: &WorkerPoolMetrics{ - TotalWorkers: int32(workers), //nolint:gosec // G115: safe, max is 64 - QueueCapacity: int32(queueSize), //nolint:gosec // G115: safe, max is 1000 - windowStart: time.Now(), + windowStart: time.Now(), }, } + wp.Metrics.TotalWorkers.Store(int32(workers)) //nolint:gosec // G115: safe, max is 64 + wp.Metrics.QueueCapacity.Store(int32(queueSize)) //nolint:gosec // G115: safe, max is 1000 + return wp } // Start initializes and starts the worker goroutines diff --git a/evmrpc/worker_pool_metrics.go b/evmrpc/worker_pool_metrics.go index c18e2d404e..d195331d96 100644 --- a/evmrpc/worker_pool_metrics.go +++ b/evmrpc/worker_pool_metrics.go @@ -37,9 +37,9 @@ const ( // WorkerPoolMetrics tracks worker pool performance metrics type WorkerPoolMetrics struct { // Worker pool stats - TotalWorkers int32 + TotalWorkers atomic.Int32 ActiveWorkers atomic.Int32 - QueueCapacity int32 + QueueCapacity atomic.Int32 QueueDepth atomic.Int32 PeakQueueDepth atomic.Int32 TasksSubmitted atomic.Int64 @@ -50,7 +50,7 @@ type WorkerPoolMetrics struct { TotalExecTimeNs atomic.Int64 // Total task execution time // DB Semaphore stats - DBSemaphoreCapacity int32 + DBSemaphoreCapacity atomic.Int32 DBSemaphoreAcquired atomic.Int32 DBSemaphoreWaitTimeNs atomic.Int64 DBSemaphoreWaitCount atomic.Int64 @@ -424,16 +424,20 @@ func (m *WorkerPoolMetrics) GetAverageLatency() time.Duration { // GetSnapshot returns a snapshot of current metrics func (m *WorkerPoolMetrics) GetSnapshot() MetricsSnapshot { + totalWorkers := m.TotalWorkers.Load() + queueCap := m.QueueCapacity.Load() + dbSemCap := m.DBSemaphoreCapacity.Load() + return MetricsSnapshot{ Timestamp: time.Now(), // Worker pool - TotalWorkers: m.TotalWorkers, + TotalWorkers: totalWorkers, ActiveWorkers: m.ActiveWorkers.Load(), - IdleWorkers: m.TotalWorkers - m.ActiveWorkers.Load(), - QueueCapacity: m.QueueCapacity, + IdleWorkers: totalWorkers - m.ActiveWorkers.Load(), + QueueCapacity: queueCap, QueueDepth: m.QueueDepth.Load(), - QueueUtilization: float64(m.QueueDepth.Load()) / float64(m.QueueCapacity) * 100, + QueueUtilization: float64(m.QueueDepth.Load()) / float64(max(queueCap, 1)) * 100, PeakQueueDepth: m.PeakQueueDepth.Load(), TasksSubmitted: m.TasksSubmitted.Load(), TasksCompleted: m.TasksCompleted.Load(), @@ -443,9 +447,9 @@ func (m *WorkerPoolMetrics) GetSnapshot() MetricsSnapshot { AvgExecTime: m.GetAverageExecTime(), // DB Semaphore - DBSemaphoreCapacity: m.DBSemaphoreCapacity, + DBSemaphoreCapacity: dbSemCap, DBSemaphoreInUse: m.DBSemaphoreAcquired.Load(), - DBSemaphoreAvail: m.DBSemaphoreCapacity - m.DBSemaphoreAcquired.Load(), + DBSemaphoreAvail: dbSemCap - m.DBSemaphoreAcquired.Load(), AvgDBWaitTime: m.GetAverageDBWaitTime(), // eth_getLogs @@ -720,17 +724,21 @@ func (m *WorkerPoolMetrics) ResetMetrics() { // All metrics are exported as gauges for efficiency (batch export instead of per-operation) func (m *WorkerPoolMetrics) ExportPrometheusMetrics() { // Worker Pool Gauges - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "total"}, float32(m.TotalWorkers)) + totalWorkers := m.TotalWorkers.Load() + queueCap := m.QueueCapacity.Load() + dbSemCap := m.DBSemaphoreCapacity.Load() + + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "total"}, float32(totalWorkers)) gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "active"}, float32(m.ActiveWorkers.Load())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "idle"}, float32(m.TotalWorkers-m.ActiveWorkers.Load())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "capacity"}, float32(m.QueueCapacity)) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "idle"}, float32(totalWorkers-m.ActiveWorkers.Load())) + gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "capacity"}, float32(queueCap)) gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "depth"}, float32(m.QueueDepth.Load())) gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "peak"}, float32(m.PeakQueueDepth.Load())) // Queue utilization percentage utilization := float32(0) - if m.QueueCapacity > 0 { - utilization = float32(m.QueueDepth.Load()) / float32(m.QueueCapacity) * 100 + if queueCap > 0 { + utilization = float32(m.QueueDepth.Load()) / float32(queueCap) * 100 } gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "utilization"}, utilization) @@ -741,9 +749,9 @@ func (m *WorkerPoolMetrics) ExportPrometheusMetrics() { gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "panicked", "total"}, float32(m.TasksPanicked.Load())) // DB Semaphore Gauges - gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "capacity"}, float32(m.DBSemaphoreCapacity)) + gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "capacity"}, float32(dbSemCap)) gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "inuse"}, float32(m.DBSemaphoreAcquired.Load())) - gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "available"}, float32(m.DBSemaphoreCapacity-m.DBSemaphoreAcquired.Load())) + gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "available"}, float32(dbSemCap-m.DBSemaphoreAcquired.Load())) gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "wait", "count"}, float32(m.DBSemaphoreWaitCount.Load())) // Subscriptions Gauge From 96e2a6c5470a6aa664a3c057a52450387d2d3624 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 12 Dec 2025 10:19:03 -0800 Subject: [PATCH 5/8] Add proper start and close caller for metrics printer --- evmrpc/rpcstack.go | 10 +++++++++- evmrpc/server.go | 12 ++---------- evmrpc/worker_pool_metrics.go | 17 ++++++++++++----- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/evmrpc/rpcstack.go b/evmrpc/rpcstack.go index 6ca0bc73f7..e438faba6c 100644 --- a/evmrpc/rpcstack.go +++ b/evmrpc/rpcstack.go @@ -93,7 +93,8 @@ type HTTPServer struct { } const ( - shutdownTimeout = 5 * time.Second + shutdownTimeout = 5 * time.Second + metricsPrinterInterval = 5 * time.Second ) func NewHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *HTTPServer { @@ -184,6 +185,10 @@ func (h *HTTPServer) Start() error { "vhosts", strings.Join(h.HTTPConfig.Vhosts, ","), ) + // Start metrics printer + // Prometheus metrics are always exported; stdout printing requires EVM_DEBUG_METRICS=true + StartMetricsPrinter(metricsPrinterInterval) + // Log all handlers mounted on server. paths := make([]string, len(h.handlerNames)) for path := range h.handlerNames { @@ -256,6 +261,9 @@ func (h *HTTPServer) doStop() { return // not running } + // Stop metrics printer + StopMetricsPrinter() + // Shut down the server. httpHandler := h.httpHandler.Load().(*rpcHandler) wsHandler := h.wsHandler.Load().(*rpcHandler) diff --git a/evmrpc/server.go b/evmrpc/server.go index 4f7fdfa3f3..a92b874c7b 100644 --- a/evmrpc/server.go +++ b/evmrpc/server.go @@ -2,10 +2,6 @@ package evmrpc import ( "context" - "strings" - "sync" - "time" - "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" sdk "github.com/cosmos/cosmos-sdk/types" @@ -19,6 +15,8 @@ import ( "github.com/sei-protocol/sei-chain/x/evm/keeper" "github.com/tendermint/tendermint/libs/log" rpcclient "github.com/tendermint/tendermint/rpc/client" + "strings" + "sync" ) type ConnectionType string @@ -62,9 +60,6 @@ func NewEVMHTTPServer( // Only set once to avoid races when multiple test servers start in parallel. pool.Metrics.DBSemaphoreCapacity.CompareAndSwap(0, int32(workerCount)) //nolint:gosec // G115: safe, max is 64 - // Start metrics printer (every 5 seconds) - // Prometheus metrics are always exported; stdout printing requires EVM_DEBUG_METRICS=true - StartMetricsPrinter(5 * time.Second) debugEnabled := IsDebugMetricsEnabled() logger.Info("Started EVM RPC metrics exporter (interval: 5s)", "workers", workerCount, "queue", queueSize, "db_semaphore", workerCount, "debug_stdout", debugEnabled) if !debugEnabled { @@ -249,9 +244,6 @@ func NewEVMWebSocketServer( // This is idempotent - if HTTP server already initialized it, this is a no-op InitGlobalWorkerPool(config.WorkerPoolSize, config.WorkerQueueSize) - // Start metrics printer (idempotent - only first call starts printer) - StartMetricsPrinter(5 * time.Second) - // Initialize WebSocket tracker. stats.InitWSTracker(ctxProvider(LatestCtxHeight).Context(), logger, config.RPCStatsInterval) diff --git a/evmrpc/worker_pool_metrics.go b/evmrpc/worker_pool_metrics.go index d195331d96..c7ab90073e 100644 --- a/evmrpc/worker_pool_metrics.go +++ b/evmrpc/worker_pool_metrics.go @@ -102,6 +102,7 @@ type WorkerPoolMetrics struct { var ( metricsPrinterOnce sync.Once + metricsStopOnce sync.Once metricsStopChan chan struct{} ) @@ -121,7 +122,10 @@ func StartMetricsPrinter(interval time.Duration) { debugEnabled := IsDebugMetricsEnabled() go func() { ticker := time.NewTicker(interval) - defer ticker.Stop() + defer func() { + ticker.Stop() + metricsStopChan = nil + }() for { select { @@ -137,15 +141,18 @@ func StartMetricsPrinter(interval time.Duration) { return } } + }() }) } -// StopMetricsPrinter stops the metrics printer +// StopMetricsPrinter stops the metrics printer, idempotent. func StopMetricsPrinter() { - if metricsStopChan != nil { - close(metricsStopChan) - } + metricsStopOnce.Do(func() { + if metricsStopChan != nil { + close(metricsStopChan) + } + }) } // RecordTaskSubmitted records a task submission From f46277577805138cd63371d0ddc1157928da3123 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 12 Dec 2025 10:23:25 -0800 Subject: [PATCH 6/8] Remove unused functions --- evmrpc/worker_pool_metrics.go | 97 +---------------------------------- 1 file changed, 1 insertion(+), 96 deletions(-) diff --git a/evmrpc/worker_pool_metrics.go b/evmrpc/worker_pool_metrics.go index c7ab90073e..a5ed9ba3aa 100644 --- a/evmrpc/worker_pool_metrics.go +++ b/evmrpc/worker_pool_metrics.go @@ -359,7 +359,7 @@ func (m *WorkerPoolMetrics) RecordSubscriptionEnd() { func (m *WorkerPoolMetrics) RecordSubscriptionError() { m.SubscriptionErrors.Add(1) // Export to Prometheus - IncrPrometheusSubscriptionError() + telemetry.IncrCounter(1, "sei", "evm", "subscriptions", "errors") } // GetTPS calculates the current TPS based on time window @@ -784,98 +784,3 @@ func (m *WorkerPoolMetrics) ExportPrometheusMetrics() { gometrics.SetGauge([]string{"sei", "evm", "workerpool", "avg", "exec", "time", "ms"}, float32(m.GetAverageExecTime().Milliseconds())) gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "avg", "wait", "ms"}, float32(m.GetAverageDBWaitTime().Milliseconds())) } - -// IncrTaskSubmitted increments task submitted counter in Prometheus -func IncrPrometheusTaskSubmitted() { - telemetry.IncrCounter(1, "sei", "evm", "workerpool", "tasks", "submitted") -} - -// IncrTaskCompleted increments task completed counter in Prometheus -func IncrPrometheusTaskCompleted() { - telemetry.IncrCounter(1, "sei", "evm", "workerpool", "tasks", "completed") -} - -// IncrTaskRejected increments task rejected counter in Prometheus -func IncrPrometheusTaskRejected() { - telemetry.IncrCounterWithLabels( - []string{"sei", "evm", "workerpool", "tasks", "rejected"}, - 1, - []gometrics.Label{telemetry.NewLabel("reason", "queue_full")}, - ) -} - -// IncrTaskPanicked increments task panicked counter in Prometheus -func IncrPrometheusTaskPanicked() { - telemetry.IncrCounter(1, "sei", "evm", "workerpool", "tasks", "panicked") -} - -// IncrGetLogsRequest increments eth_getLogs request counter with labels -func IncrPrometheusGetLogsRequest(success bool, blockRange int64) { - rangeLabel := "small" - if blockRange > 100 { - rangeLabel = "medium" - } - if blockRange > 1000 { - rangeLabel = "large" - } - - successStr := "true" - if !success { - successStr = "false" - } - - telemetry.IncrCounterWithLabels( - []string{"sei", "evm", "getlogs", "requests"}, - 1, - []gometrics.Label{ - telemetry.NewLabel("success", successStr), - telemetry.NewLabel("range", rangeLabel), - }, - ) -} - -// MeasureGetLogsLatency records eth_getLogs latency -func MeasurePrometheusGetLogsLatency(startTime time.Time, blockRange int64) { - rangeLabel := "small" - if blockRange > 100 { - rangeLabel = "medium" - } - if blockRange > 1000 { - rangeLabel = "large" - } - - gometrics.MeasureSinceWithLabels( - []string{"sei", "evm", "getlogs", "latency", "ms"}, - startTime.UTC(), - []gometrics.Label{telemetry.NewLabel("range", rangeLabel)}, - ) -} - -// IncrSubscriptionError increments subscription error counter -func IncrPrometheusSubscriptionError() { - telemetry.IncrCounter(1, "sei", "evm", "subscriptions", "errors") -} - -// RecordDBSemaphoreWaitTime records DB semaphore wait time histogram -func RecordPrometheusDBSemaphoreWait(waitTime time.Duration) { - gometrics.AddSample( - []string{"sei", "evm", "db", "semaphore", "wait", "ms"}, - float32(waitTime.Milliseconds()), - ) -} - -// RecordQueueWaitTime records queue wait time histogram -func RecordPrometheusQueueWait(waitTime time.Duration) { - gometrics.AddSample( - []string{"sei", "evm", "workerpool", "queue", "wait", "ms"}, - float32(waitTime.Milliseconds()), - ) -} - -// RecordTaskExecTime records task execution time histogram -func RecordPrometheusTaskExec(execTime time.Duration) { - gometrics.AddSample( - []string{"sei", "evm", "workerpool", "task", "exec", "ms"}, - float32(execTime.Milliseconds()), - ) -} From 508928df6536abb0c25a519c0e06fd5168edc609 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 12 Dec 2025 10:28:24 -0800 Subject: [PATCH 7/8] Switch to use otel --- evmrpc/worker_pool_metrics.go | 268 +++++++++++++++++++++++++++++----- 1 file changed, 233 insertions(+), 35 deletions(-) diff --git a/evmrpc/worker_pool_metrics.go b/evmrpc/worker_pool_metrics.go index a5ed9ba3aa..d6b30ee799 100644 --- a/evmrpc/worker_pool_metrics.go +++ b/evmrpc/worker_pool_metrics.go @@ -1,6 +1,7 @@ package evmrpc import ( + "context" "fmt" "os" "strings" @@ -8,8 +9,9 @@ import ( "sync/atomic" "time" - gometrics "github.com/armon/go-metrics" "github.com/cosmos/cosmos-sdk/telemetry" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" ) // Environment variable to enable debug metrics printing to stdout @@ -106,6 +108,194 @@ var ( metricsStopChan chan struct{} ) +var ( + meter = otel.Meter("evmrpc_workerpool") + + otelMetrics = struct { + workersTotal metric.Int64Gauge + workersActive metric.Int64Gauge + workersIdle metric.Int64Gauge + queueCapacity metric.Int64Gauge + queueDepth metric.Int64Gauge + queuePeak metric.Int64Gauge + queueUtilizationPct metric.Float64Gauge + tasksSubmittedTotal metric.Int64Gauge + tasksCompletedTotal metric.Int64Gauge + tasksRejectedTotal metric.Int64Gauge + tasksPanickedTotal metric.Int64Gauge + dbSemaphoreCapacity metric.Int64Gauge + dbSemaphoreInUse metric.Int64Gauge + dbSemaphoreAvailable metric.Int64Gauge + dbSemaphoreWaitCount metric.Int64Gauge + subscriptionsActive metric.Int64Gauge + getLogsRequestsTotal metric.Int64Gauge + getLogsSuccessTotal metric.Int64Gauge + getLogsErrorsTotal metric.Int64Gauge + getLogsTPS metric.Float64Gauge + getLogsAvgBlockRange metric.Float64Gauge + getLogsPeakBlockRange metric.Int64Gauge + getLogsAvgLatencyMs metric.Float64Gauge + getLogsMaxLatencyMs metric.Float64Gauge + errRangeTooLarge metric.Int64Gauge + errRateLimited metric.Int64Gauge + errBackpressure metric.Int64Gauge + avgQueueWaitMs metric.Float64Gauge + avgExecTimeMs metric.Float64Gauge + avgDBWaitMs metric.Float64Gauge + }{ + workersTotal: must(meter.Int64Gauge( + "evmrpc_workerpool_workers_total", + metric.WithDescription("Total worker count"), + metric.WithUnit("{count}"), + )), + workersActive: must(meter.Int64Gauge( + "evmrpc_workerpool_workers_active", + metric.WithDescription("Active worker count"), + metric.WithUnit("{count}"), + )), + workersIdle: must(meter.Int64Gauge( + "evmrpc_workerpool_workers_idle", + metric.WithDescription("Idle worker count"), + metric.WithUnit("{count}"), + )), + queueCapacity: must(meter.Int64Gauge( + "evmrpc_workerpool_queue_capacity", + metric.WithDescription("Task queue capacity"), + metric.WithUnit("{count}"), + )), + queueDepth: must(meter.Int64Gauge( + "evmrpc_workerpool_queue_depth", + metric.WithDescription("Current task queue depth"), + metric.WithUnit("{count}"), + )), + queuePeak: must(meter.Int64Gauge( + "evmrpc_workerpool_queue_peak", + metric.WithDescription("Peak queue depth observed"), + metric.WithUnit("{count}"), + )), + queueUtilizationPct: must(meter.Float64Gauge( + "evmrpc_workerpool_queue_utilization_pct", + metric.WithDescription("Queue utilization percentage"), + metric.WithUnit("1"), + )), + tasksSubmittedTotal: must(meter.Int64Gauge( + "evmrpc_workerpool_tasks_submitted_total", + metric.WithDescription("Tasks submitted"), + metric.WithUnit("{count}"), + )), + tasksCompletedTotal: must(meter.Int64Gauge( + "evmrpc_workerpool_tasks_completed_total", + metric.WithDescription("Tasks completed"), + metric.WithUnit("{count}"), + )), + tasksRejectedTotal: must(meter.Int64Gauge( + "evmrpc_workerpool_tasks_rejected_total", + metric.WithDescription("Tasks rejected due to full queue"), + metric.WithUnit("{count}"), + )), + tasksPanickedTotal: must(meter.Int64Gauge( + "evmrpc_workerpool_tasks_panicked_total", + metric.WithDescription("Tasks that panicked"), + metric.WithUnit("{count}"), + )), + dbSemaphoreCapacity: must(meter.Int64Gauge( + "evmrpc_db_semaphore_capacity", + metric.WithDescription("DB semaphore capacity"), + metric.WithUnit("{count}"), + )), + dbSemaphoreInUse: must(meter.Int64Gauge( + "evmrpc_db_semaphore_inuse", + metric.WithDescription("DB semaphore currently acquired"), + metric.WithUnit("{count}"), + )), + dbSemaphoreAvailable: must(meter.Int64Gauge( + "evmrpc_db_semaphore_available", + metric.WithDescription("DB semaphore available slots"), + metric.WithUnit("{count}"), + )), + dbSemaphoreWaitCount: must(meter.Int64Gauge( + "evmrpc_db_semaphore_wait_count", + metric.WithDescription("DB semaphore wait count"), + metric.WithUnit("{count}"), + )), + subscriptionsActive: must(meter.Int64Gauge( + "evmrpc_subscriptions_active", + metric.WithDescription("Active subscriptions"), + metric.WithUnit("{count}"), + )), + getLogsRequestsTotal: must(meter.Int64Gauge( + "evmrpc_getlogs_requests_total", + metric.WithDescription("Total eth_getLogs requests"), + metric.WithUnit("{count}"), + )), + getLogsSuccessTotal: must(meter.Int64Gauge( + "evmrpc_getlogs_success_total", + metric.WithDescription("Successful eth_getLogs requests"), + metric.WithUnit("{count}"), + )), + getLogsErrorsTotal: must(meter.Int64Gauge( + "evmrpc_getlogs_errors_total", + metric.WithDescription("Errored eth_getLogs requests"), + metric.WithUnit("{count}"), + )), + getLogsTPS: must(meter.Float64Gauge( + "evmrpc_getlogs_tps", + metric.WithDescription("eth_getLogs throughput (req/s)"), + metric.WithUnit("1/s"), + )), + getLogsAvgBlockRange: must(meter.Float64Gauge( + "evmrpc_getlogs_avg_blockrange", + metric.WithDescription("Average block range for eth_getLogs"), + metric.WithUnit("{blocks}"), + )), + getLogsPeakBlockRange: must(meter.Int64Gauge( + "evmrpc_getlogs_peak_blockrange", + metric.WithDescription("Peak block range for eth_getLogs"), + metric.WithUnit("{blocks}"), + )), + getLogsAvgLatencyMs: must(meter.Float64Gauge( + "evmrpc_getlogs_avg_latency_ms", + metric.WithDescription("Average eth_getLogs latency (ms)"), + metric.WithUnit("ms"), + )), + getLogsMaxLatencyMs: must(meter.Float64Gauge( + "evmrpc_getlogs_max_latency_ms", + metric.WithDescription("Max eth_getLogs latency (ms)"), + metric.WithUnit("ms"), + )), + errRangeTooLarge: must(meter.Int64Gauge( + "evmrpc_getlogs_errors_range_too_large", + metric.WithDescription("Errors due to block range too large"), + metric.WithUnit("{count}"), + )), + errRateLimited: must(meter.Int64Gauge( + "evmrpc_getlogs_errors_rate_limited", + metric.WithDescription("Errors due to rate limiting"), + metric.WithUnit("{count}"), + )), + errBackpressure: must(meter.Int64Gauge( + "evmrpc_getlogs_errors_backpressure", + metric.WithDescription("Errors due to backpressure"), + metric.WithUnit("{count}"), + )), + avgQueueWaitMs: must(meter.Float64Gauge( + "evmrpc_workerpool_avg_queue_wait_ms", + metric.WithDescription("Average queue wait time (ms)"), + metric.WithUnit("ms"), + )), + avgExecTimeMs: must(meter.Float64Gauge( + "evmrpc_workerpool_avg_exec_time_ms", + metric.WithDescription("Average execution time (ms)"), + metric.WithUnit("ms"), + )), + avgDBWaitMs: must(meter.Float64Gauge( + "evmrpc_db_semaphore_avg_wait_ms", + metric.WithDescription("Average DB semaphore wait (ms)"), + metric.WithUnit("ms"), + )), + } +) + // GetGlobalMetrics returns the metrics from the global worker pool // This is a convenience function for accessing metrics without importing worker pool func GetGlobalMetrics() *WorkerPoolMetrics { @@ -672,6 +862,13 @@ func repeatStr(s string, count int) string { return result } +func must[V any](v V, err error) V { + if err != nil { + panic(err) + } + return v +} + // ResetMetrics resets all metrics (useful for testing) func (m *WorkerPoolMetrics) ResetMetrics() { m.ActiveWorkers.Store(0) @@ -726,61 +923,62 @@ func (m *WorkerPoolMetrics) ResetMetrics() { // Prometheus Metrics Export Functions // ======================================== -// ExportPrometheusMetrics exports all metrics to Prometheus +// ExportPrometheusMetrics exports all metrics to OTel // This should be called periodically (e.g., every 5 seconds) // All metrics are exported as gauges for efficiency (batch export instead of per-operation) func (m *WorkerPoolMetrics) ExportPrometheusMetrics() { + ctx := context.Background() + // Worker Pool Gauges totalWorkers := m.TotalWorkers.Load() queueCap := m.QueueCapacity.Load() dbSemCap := m.DBSemaphoreCapacity.Load() - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "total"}, float32(totalWorkers)) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "active"}, float32(m.ActiveWorkers.Load())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "workers", "idle"}, float32(totalWorkers-m.ActiveWorkers.Load())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "capacity"}, float32(queueCap)) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "depth"}, float32(m.QueueDepth.Load())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "peak"}, float32(m.PeakQueueDepth.Load())) + otelMetrics.workersTotal.Record(ctx, int64(totalWorkers)) + otelMetrics.workersActive.Record(ctx, int64(m.ActiveWorkers.Load())) + otelMetrics.workersIdle.Record(ctx, int64(totalWorkers-m.ActiveWorkers.Load())) + otelMetrics.queueCapacity.Record(ctx, int64(queueCap)) + otelMetrics.queueDepth.Record(ctx, int64(m.QueueDepth.Load())) + otelMetrics.queuePeak.Record(ctx, int64(m.PeakQueueDepth.Load())) - // Queue utilization percentage - utilization := float32(0) + utilization := float64(0) if queueCap > 0 { - utilization = float32(m.QueueDepth.Load()) / float32(queueCap) * 100 + utilization = float64(m.QueueDepth.Load()) / float64(queueCap) * 100 } - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "queue", "utilization"}, utilization) + otelMetrics.queueUtilizationPct.Record(ctx, utilization) // Task counters (exported as gauges for batch efficiency) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "submitted", "total"}, float32(m.TasksSubmitted.Load())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "completed", "total"}, float32(m.TasksCompleted.Load())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "rejected", "total"}, float32(m.TasksRejected.Load())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "tasks", "panicked", "total"}, float32(m.TasksPanicked.Load())) + otelMetrics.tasksSubmittedTotal.Record(ctx, m.TasksSubmitted.Load()) + otelMetrics.tasksCompletedTotal.Record(ctx, m.TasksCompleted.Load()) + otelMetrics.tasksRejectedTotal.Record(ctx, m.TasksRejected.Load()) + otelMetrics.tasksPanickedTotal.Record(ctx, m.TasksPanicked.Load()) // DB Semaphore Gauges - gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "capacity"}, float32(dbSemCap)) - gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "inuse"}, float32(m.DBSemaphoreAcquired.Load())) - gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "available"}, float32(dbSemCap-m.DBSemaphoreAcquired.Load())) - gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "wait", "count"}, float32(m.DBSemaphoreWaitCount.Load())) + otelMetrics.dbSemaphoreCapacity.Record(ctx, int64(dbSemCap)) + otelMetrics.dbSemaphoreInUse.Record(ctx, int64(m.DBSemaphoreAcquired.Load())) + otelMetrics.dbSemaphoreAvailable.Record(ctx, int64(dbSemCap-m.DBSemaphoreAcquired.Load())) + otelMetrics.dbSemaphoreWaitCount.Record(ctx, m.DBSemaphoreWaitCount.Load()) // Subscriptions Gauge - gometrics.SetGauge([]string{"sei", "evm", "subscriptions", "active"}, float32(m.ActiveSubscriptions.Load())) + otelMetrics.subscriptionsActive.Record(ctx, int64(m.ActiveSubscriptions.Load())) // eth_getLogs specific gauges - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "requests", "total"}, float32(m.GetLogsRequests.Load())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "success", "total"}, float32(m.GetLogsSuccess.Load())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "errors", "total"}, float32(m.GetLogsErrors.Load())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "tps"}, float32(m.GetTPS())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "avg", "blockrange"}, float32(m.GetAverageBlockRange())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "peak", "blockrange"}, float32(m.GetLogsPeakRange.Load())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "avg", "latency", "ms"}, float32(m.GetAverageLatency().Milliseconds())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "max", "latency", "ms"}, float32(m.GetLogsMaxLatencyNs.Load()/1e6)) + otelMetrics.getLogsRequestsTotal.Record(ctx, m.GetLogsRequests.Load()) + otelMetrics.getLogsSuccessTotal.Record(ctx, m.GetLogsSuccess.Load()) + otelMetrics.getLogsErrorsTotal.Record(ctx, m.GetLogsErrors.Load()) + otelMetrics.getLogsTPS.Record(ctx, m.GetTPS()) + otelMetrics.getLogsAvgBlockRange.Record(ctx, m.GetAverageBlockRange()) + otelMetrics.getLogsPeakBlockRange.Record(ctx, m.GetLogsPeakRange.Load()) + otelMetrics.getLogsAvgLatencyMs.Record(ctx, float64(m.GetAverageLatency().Milliseconds())) + otelMetrics.getLogsMaxLatencyMs.Record(ctx, float64(m.GetLogsMaxLatencyNs.Load())/1e6) // Error breakdown - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "errors", "range_too_large"}, float32(m.ErrRangeTooLarge.Load())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "errors", "rate_limited"}, float32(m.ErrRateLimited.Load())) - gometrics.SetGauge([]string{"sei", "evm", "getlogs", "errors", "backpressure"}, float32(m.ErrBackpressure.Load())) + otelMetrics.errRangeTooLarge.Record(ctx, m.ErrRangeTooLarge.Load()) + otelMetrics.errRateLimited.Record(ctx, m.ErrRateLimited.Load()) + otelMetrics.errBackpressure.Record(ctx, m.ErrBackpressure.Load()) // Average timings - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "avg", "queue", "wait", "ms"}, float32(m.GetAverageQueueWaitTime().Milliseconds())) - gometrics.SetGauge([]string{"sei", "evm", "workerpool", "avg", "exec", "time", "ms"}, float32(m.GetAverageExecTime().Milliseconds())) - gometrics.SetGauge([]string{"sei", "evm", "db", "semaphore", "avg", "wait", "ms"}, float32(m.GetAverageDBWaitTime().Milliseconds())) + otelMetrics.avgQueueWaitMs.Record(ctx, float64(m.GetAverageQueueWaitTime().Milliseconds())) + otelMetrics.avgExecTimeMs.Record(ctx, float64(m.GetAverageExecTime().Milliseconds())) + otelMetrics.avgDBWaitMs.Record(ctx, float64(m.GetAverageDBWaitTime().Milliseconds())) } From 19aa047464a988d493d475c4e27b3089805f31f6 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 12 Dec 2025 10:37:48 -0800 Subject: [PATCH 8/8] Fix golint --- evmrpc/server.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/evmrpc/server.go b/evmrpc/server.go index a92b874c7b..8a6d948080 100644 --- a/evmrpc/server.go +++ b/evmrpc/server.go @@ -2,11 +2,15 @@ package evmrpc import ( "context" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rpc" "github.com/sei-protocol/sei-chain/app/legacyabci" evmrpcconfig "github.com/sei-protocol/sei-chain/evmrpc/config" "github.com/sei-protocol/sei-chain/evmrpc/stats" @@ -15,8 +19,6 @@ import ( "github.com/sei-protocol/sei-chain/x/evm/keeper" "github.com/tendermint/tendermint/libs/log" rpcclient "github.com/tendermint/tendermint/rpc/client" - "strings" - "sync" ) type ConnectionType string