diff --git a/docker/monitornode/dashboards/cryptosim-dashboard.json b/docker/monitornode/dashboards/cryptosim-dashboard.json index 08647b78ab..8dfc193e75 100644 --- a/docker/monitornode/dashboards/cryptosim-dashboard.json +++ b/docker/monitornode/dashboards/cryptosim-dashboard.json @@ -3786,6 +3786,894 @@ ], "title": "Receipt Errors", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 282, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(cryptosim_receipt_read_duration_seconds_bucket[$__rate_interval]))", + "legendFormat": "p99", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, rate(cryptosim_receipt_read_duration_seconds_bucket[$__rate_interval]))", + "instant": false, + "legendFormat": "p95", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, rate(cryptosim_receipt_read_duration_seconds_bucket[$__rate_interval]))", + "instant": false, + "legendFormat": "p50", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_read_duration_seconds_sum[$__rate_interval]) / rate(cryptosim_receipt_read_duration_seconds_count[$__rate_interval])", + "instant": false, + "legendFormat": "average", + "range": true, + "refId": "D" + } + ], + "title": "Receipt Read Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 283, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_reads_total[$__rate_interval])", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Receipt Reads/sec", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "max": 1, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 24 + }, + "id": 284, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_cache_hits_total[$__rate_interval]) / (rate(cryptosim_receipt_cache_hits_total[$__rate_interval]) + rate(cryptosim_receipt_cache_misses_total[$__rate_interval]))", + "legendFormat": "cache hit %", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_cache_misses_total[$__rate_interval]) / (rate(cryptosim_receipt_cache_hits_total[$__rate_interval]) + rate(cryptosim_receipt_cache_misses_total[$__rate_interval]))", + "instant": false, + "legendFormat": "cache miss %", + "range": true, + "refId": "B" + } + ], + "title": "Receipt Cache Hit/Miss %", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "max": 1, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 24 + }, + "id": 288, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_log_filter_cache_hits_total[$__rate_interval]) / (rate(cryptosim_receipt_log_filter_cache_hits_total[$__rate_interval]) + rate(cryptosim_receipt_log_filter_cache_miss_total[$__rate_interval]))", + "legendFormat": "cache hit %", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_log_filter_cache_miss_total[$__rate_interval]) / (rate(cryptosim_receipt_log_filter_cache_hits_total[$__rate_interval]) + rate(cryptosim_receipt_log_filter_cache_miss_total[$__rate_interval]))", + "instant": false, + "legendFormat": "cache miss %", + "range": true, + "refId": "B" + } + ], + "title": "Log Filter Cache Hit/Miss %", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 32 + }, + "id": 286, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(cryptosim_receipt_log_filter_duration_seconds_bucket[$__rate_interval]))", + "legendFormat": "p99", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, rate(cryptosim_receipt_log_filter_duration_seconds_bucket[$__rate_interval]))", + "instant": false, + "legendFormat": "p95", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, rate(cryptosim_receipt_log_filter_duration_seconds_bucket[$__rate_interval]))", + "instant": false, + "legendFormat": "p50", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_log_filter_duration_seconds_sum[$__rate_interval]) / rate(cryptosim_receipt_log_filter_duration_seconds_count[$__rate_interval])", + "instant": false, + "legendFormat": "average", + "range": true, + "refId": "D" + } + ], + "title": "Log Filter Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "id": 287, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_log_filter_duration_seconds_count[$__rate_interval])", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Log Reads/sec", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 40 + }, + "id": 289, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(cryptosim_receipt_log_filter_logs_returned_bucket[$__rate_interval]))", + "legendFormat": "p99", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, rate(cryptosim_receipt_log_filter_logs_returned_bucket[$__rate_interval]))", + "instant": false, + "legendFormat": "p50", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(cryptosim_receipt_log_filter_logs_returned_sum[$__rate_interval]) / rate(cryptosim_receipt_log_filter_logs_returned_count[$__rate_interval])", + "instant": false, + "legendFormat": "average", + "range": true, + "refId": "C" + } + ], + "title": "Logs Returned Per Query", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", + "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, "barWidthFactor": 0.6, + "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, + "pointSize": 5, "scaleDistribution": { "type": "linear" }, + "showPoints": "auto", "showValues": false, "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "red", "value": 80 }] }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 48 }, + "id": 290, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "hideZeros": false, "mode": "single", "sort": "none" } + }, + "pluginVersion": "12.4.0", + "targets": [ + { "editorMode": "code", "expr": "histogram_quantile(0.99, rate(cryptosim_receipt_cache_filter_scan_duration_seconds_bucket[$__rate_interval]))", "legendFormat": "p99", "range": true, "refId": "A" }, + { "editorMode": "code", "expr": "histogram_quantile(0.95, rate(cryptosim_receipt_cache_filter_scan_duration_seconds_bucket[$__rate_interval]))", "legendFormat": "p95", "range": true, "refId": "B" }, + { "editorMode": "code", "expr": "histogram_quantile(0.50, rate(cryptosim_receipt_cache_filter_scan_duration_seconds_bucket[$__rate_interval]))", "legendFormat": "p50", "range": true, "refId": "C" }, + { "editorMode": "code", "expr": "rate(cryptosim_receipt_cache_filter_scan_duration_seconds_sum[$__rate_interval]) / rate(cryptosim_receipt_cache_filter_scan_duration_seconds_count[$__rate_interval])", "legendFormat": "average", "range": true, "refId": "D" } + ], + "title": "Cache Filter Scan Duration", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", + "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, "barWidthFactor": 0.6, + "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", + "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, + "pointSize": 5, "scaleDistribution": { "type": "linear" }, + "showPoints": "auto", "showValues": false, "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "red", "value": 80 }] }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 48 }, + "id": 291, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "hideZeros": false, "mode": "single", "sort": "none" } + }, + "pluginVersion": "12.4.0", + "targets": [ + { "editorMode": "code", "expr": "histogram_quantile(0.99, rate(cryptosim_receipt_cache_get_duration_seconds_bucket[$__rate_interval]))", "legendFormat": "p99", "range": true, "refId": "A" }, + { "editorMode": "code", "expr": "histogram_quantile(0.95, rate(cryptosim_receipt_cache_get_duration_seconds_bucket[$__rate_interval]))", "legendFormat": "p95", "range": true, "refId": "B" }, + { "editorMode": "code", "expr": "histogram_quantile(0.50, rate(cryptosim_receipt_cache_get_duration_seconds_bucket[$__rate_interval]))", "legendFormat": "p50", "range": true, "refId": "C" }, + { "editorMode": "code", "expr": "rate(cryptosim_receipt_cache_get_duration_seconds_sum[$__rate_interval]) / rate(cryptosim_receipt_cache_get_duration_seconds_count[$__rate_interval])", "legendFormat": "average", "range": true, "refId": "D" } + ], + "title": "Cache Get Duration", + "type": "timeseries" } ], "title": "Receipts", diff --git a/sei-db/config/receipt_config.go b/sei-db/config/receipt_config.go index 0d2887ed91..52f82c4910 100644 --- a/sei-db/config/receipt_config.go +++ b/sei-db/config/receipt_config.go @@ -48,6 +48,11 @@ type ReceiptStoreConfig struct { // PruneIntervalSeconds defines the interval in seconds to trigger pruning // default to every 600 seconds PruneIntervalSeconds int `mapstructure:"prune-interval-seconds"` + + // DisableTxIndexLookup must remain true. The tx_hash -> block_number lookup + // implementation is intentionally unsupported; setting this to false will + // panic during parquet store initialization. + DisableTxIndexLookup bool `mapstructure:"disable-tx-index-lookup"` } // DefaultReceiptStoreConfig returns the default ReceiptStoreConfig @@ -57,6 +62,7 @@ func DefaultReceiptStoreConfig() ReceiptStoreConfig { AsyncWriteBuffer: DefaultSSAsyncBuffer, KeepRecent: DefaultSSKeepRecent, PruneIntervalSeconds: DefaultSSPruneInterval, + DisableTxIndexLookup: true, } } diff --git a/sei-db/ledger_db/parquet/reader.go b/sei-db/ledger_db/parquet/reader.go index 11539190a3..12225d18f2 100644 --- a/sei-db/ledger_db/parquet/reader.go +++ b/sei-db/ledger_db/parquet/reader.go @@ -384,6 +384,9 @@ func (r *Reader) GetLogs(ctx context.Context, filter LogFilter) ([]LogResult, er if filter.ToBlock != nil && startBlock > *filter.ToBlock { continue } + if filter.FromBlock != nil && startBlock+r.maxBlocksPerFile <= *filter.FromBlock { + continue + } files = append(files, f) } if len(files) == 0 { diff --git a/sei-db/ledger_db/parquet/reader_filter_test.go b/sei-db/ledger_db/parquet/reader_filter_test.go new file mode 100644 index 0000000000..f91d518b92 --- /dev/null +++ b/sei-db/ledger_db/parquet/reader_filter_test.go @@ -0,0 +1,119 @@ +package parquet + +import ( + "context" + "fmt" + "math/big" + "os" + "testing" + + "github.com/ethereum/go-ethereum/common" + pqgo "github.com/parquet-go/parquet-go" + "github.com/stretchr/testify/require" +) + +func createTestLogFile(dir string, startBlock, count uint64) error { + path := fmt.Sprintf("%s/logs_%d.parquet", dir, startBlock) + f, err := os.Create(path) + if err != nil { + return err + } + w := pqgo.NewGenericWriter[LogRecord](f) + for i := uint64(0); i < count; i++ { + block := startBlock + i + txHash := common.BigToHash(new(big.Int).SetUint64(block)) + if _, err := w.Write([]LogRecord{{ + BlockNumber: block, + TxHash: txHash[:], + Address: common.HexToAddress("0xdead").Bytes(), + }}); err != nil { + return err + } + } + if err := w.Close(); err != nil { + return err + } + return f.Close() +} + +func uint64Ptr(v uint64) *uint64 { return &v } + +func TestGetLogsPrunesFilesBelowFromBlock(t *testing.T) { + dir := t.TempDir() + + for _, start := range []uint64{0, 500, 1000, 1500} { + require.NoError(t, createTestReceiptFile(dir, start, 500)) + require.NoError(t, createTestLogFile(dir, start, 500)) + } + + reader, err := NewReaderWithMaxBlocksPerFile(dir, 500) + require.NoError(t, err) + defer func() { _ = reader.Close() }() + + ctx := context.Background() + + results, err := reader.GetLogs(ctx, LogFilter{ + FromBlock: uint64Ptr(1200), + ToBlock: uint64Ptr(1300), + }) + require.NoError(t, err) + + for _, r := range results { + require.GreaterOrEqual(t, r.BlockNumber, uint64(1200)) + require.LessOrEqual(t, r.BlockNumber, uint64(1300)) + } + require.Equal(t, 101, len(results), "should have blocks 1200-1300 inclusive") +} + +func TestGetLogsPrunesBothEnds(t *testing.T) { + dir := t.TempDir() + + for _, start := range []uint64{0, 500, 1000, 1500, 2000} { + require.NoError(t, createTestReceiptFile(dir, start, 500)) + require.NoError(t, createTestLogFile(dir, start, 500)) + } + + reader, err := NewReaderWithMaxBlocksPerFile(dir, 500) + require.NoError(t, err) + defer func() { _ = reader.Close() }() + + ctx := context.Background() + + // Query blocks 1400-1600: should need overlapping files 1000 and 1500, + // but still prune non-overlapping files 0, 500, and 2000. + results, err := reader.GetLogs(ctx, LogFilter{ + FromBlock: uint64Ptr(1400), + ToBlock: uint64Ptr(1600), + }) + require.NoError(t, err) + + for _, r := range results { + require.GreaterOrEqual(t, r.BlockNumber, uint64(1400)) + require.LessOrEqual(t, r.BlockNumber, uint64(1600)) + } + require.Equal(t, 201, len(results), "should have blocks 1400-1600 inclusive") +} + +func TestStoreGetReceiptByTxHashWithoutIndex(t *testing.T) { + dir := t.TempDir() + + for _, start := range []uint64{0, 500, 1000} { + require.NoError(t, createTestReceiptFile(dir, start, 500)) + } + + store, err := NewStore(StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 500, + DisableTxIndexLookup: true, + }) + require.NoError(t, err) + defer func() { _ = store.Close() }() + + txHash := common.BigToHash(new(big.Int).SetUint64(750)) + + ctx := context.Background() + result, err := store.GetReceiptByTxHash(ctx, txHash) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, uint64(750), result.BlockNumber) +} diff --git a/sei-db/ledger_db/parquet/reader_race_test.go b/sei-db/ledger_db/parquet/reader_race_test.go index b24c5ff9f0..b681d9820e 100644 --- a/sei-db/ledger_db/parquet/reader_race_test.go +++ b/sei-db/ledger_db/parquet/reader_race_test.go @@ -82,9 +82,10 @@ func TestConcurrentReadsAndPrune(t *testing.T) { } store, err := NewStore(StoreConfig{ - DBDirectory: dir, - MaxBlocksPerFile: 500, - KeepRecent: 600, + DBDirectory: dir, + MaxBlocksPerFile: 500, + KeepRecent: 600, + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) @@ -134,8 +135,9 @@ func TestOnFileRotationNotBlockedByPruneMu(t *testing.T) { require.NoError(t, createTestReceiptFile(dir, 0, 1)) store, err := NewStore(StoreConfig{ - DBDirectory: dir, - MaxBlocksPerFile: 500, + DBDirectory: dir, + MaxBlocksPerFile: 500, + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) @@ -168,9 +170,10 @@ func TestConcurrentReadsPruneAndRotation(t *testing.T) { } store, err := NewStore(StoreConfig{ - DBDirectory: dir, - MaxBlocksPerFile: 500, - KeepRecent: 1000, + DBDirectory: dir, + MaxBlocksPerFile: 500, + KeepRecent: 1000, + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) diff --git a/sei-db/ledger_db/parquet/store.go b/sei-db/ledger_db/parquet/store.go index 5dcfe48950..853e21b3f1 100644 --- a/sei-db/ledger_db/parquet/store.go +++ b/sei-db/ledger_db/parquet/store.go @@ -36,13 +36,15 @@ type StoreConfig struct { PruneIntervalSeconds int64 BlockFlushInterval uint64 MaxBlocksPerFile uint64 + DisableTxIndexLookup bool } // DefaultStoreConfig returns the default store configuration. func DefaultStoreConfig() StoreConfig { return StoreConfig{ - BlockFlushInterval: defaultBlockFlushInterval, - MaxBlocksPerFile: defaultMaxBlocksPerFile, + BlockFlushInterval: defaultBlockFlushInterval, + MaxBlocksPerFile: defaultMaxBlocksPerFile, + DisableTxIndexLookup: true, } } @@ -102,6 +104,9 @@ type Store struct { // NewStore creates a new parquet store. func NewStore(cfg StoreConfig) (*Store, error) { storeCfg := resolveStoreConfig(cfg) + if !storeCfg.DisableTxIndexLookup { + panic("not implemented") + } if err := os.MkdirAll(cfg.DBDirectory, 0o750); err != nil { return nil, fmt.Errorf("failed to create parquet base directory: %w", err) @@ -151,6 +156,7 @@ func resolveStoreConfig(cfg StoreConfig) StoreConfig { resolved.DBDirectory = cfg.DBDirectory resolved.KeepRecent = cfg.KeepRecent resolved.PruneIntervalSeconds = cfg.PruneIntervalSeconds + resolved.DisableTxIndexLookup = cfg.DisableTxIndexLookup if cfg.BlockFlushInterval > 0 { resolved.BlockFlushInterval = cfg.BlockFlushInterval } @@ -188,7 +194,8 @@ func (s *Store) SetBlockFlushInterval(interval uint64) { s.config.BlockFlushInterval = interval } -// GetReceiptByTxHash retrieves a receipt by transaction hash. +// GetReceiptByTxHash retrieves a receipt by transaction hash via a full scan of +// the closed parquet files tracked by the reader. func (s *Store) GetReceiptByTxHash(ctx context.Context, txHash common.Hash) (*ReceiptResult, error) { return s.Reader.GetReceiptByTxHash(ctx, txHash) } @@ -320,6 +327,7 @@ func (s *Store) Close() error { } if closeErr := s.Reader.Close(); closeErr != nil { err = closeErr + return } }) diff --git a/sei-db/ledger_db/parquet/store_config_test.go b/sei-db/ledger_db/parquet/store_config_test.go index 4b891a2ab6..200102cafd 100644 --- a/sei-db/ledger_db/parquet/store_config_test.go +++ b/sei-db/ledger_db/parquet/store_config_test.go @@ -37,9 +37,10 @@ func (m *mockParquetWAL) Close() error { return nil } func TestNewStoreAppliesConfiguredIntervals(t *testing.T) { store, err := NewStore(StoreConfig{ - DBDirectory: t.TempDir(), - BlockFlushInterval: 7, - MaxBlocksPerFile: 11, + DBDirectory: t.TempDir(), + BlockFlushInterval: 7, + MaxBlocksPerFile: 11, + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) @@ -54,7 +55,8 @@ func TestNewStoreAppliesConfiguredIntervals(t *testing.T) { func TestNewStoreUsesDefaultIntervalsWhenUnset(t *testing.T) { store, err := NewStore(StoreConfig{ - DBDirectory: t.TempDir(), + DBDirectory: t.TempDir(), + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) @@ -62,6 +64,7 @@ func TestNewStoreUsesDefaultIntervalsWhenUnset(t *testing.T) { require.Equal(t, defaultBlockFlushInterval, store.config.BlockFlushInterval) require.Equal(t, defaultMaxBlocksPerFile, store.config.MaxBlocksPerFile) require.Equal(t, defaultMaxBlocksPerFile, store.CacheRotateInterval()) + require.True(t, store.config.DisableTxIndexLookup) } func TestNewStorePreservesKeepRecentAndPruneIntervalSettings(t *testing.T) { @@ -69,17 +72,29 @@ func TestNewStorePreservesKeepRecentAndPruneIntervalSettings(t *testing.T) { DBDirectory: t.TempDir(), KeepRecent: 123, PruneIntervalSeconds: 9, + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) require.Equal(t, int64(123), store.config.KeepRecent) require.Equal(t, int64(9), store.config.PruneIntervalSeconds) + require.True(t, store.config.DisableTxIndexLookup) +} + +func TestNewStorePanicsWhenTxIndexLookupEnabled(t *testing.T) { + require.PanicsWithValue(t, "not implemented", func() { + _, _ = NewStore(StoreConfig{ + DBDirectory: t.TempDir(), + DisableTxIndexLookup: false, + }) + }) } func TestPruneOldFilesKeepsTrackingOnDeleteFailure(t *testing.T) { store, err := NewStore(StoreConfig{ - DBDirectory: t.TempDir(), + DBDirectory: t.TempDir(), + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) @@ -133,7 +148,8 @@ func TestCorruptLastFileDeletedOnStartup(t *testing.T) { require.NoError(t, os.WriteFile(corruptLog, []byte("not a parquet file"), 0o644)) store, err := NewStore(StoreConfig{ - DBDirectory: dir, + DBDirectory: dir, + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) @@ -165,7 +181,8 @@ func TestCorruptLogFileUntracksReceiptCounterpart(t *testing.T) { require.NoError(t, os.WriteFile(corruptLog, []byte("not a parquet file"), 0o644)) store, err := NewStore(StoreConfig{ - DBDirectory: dir, + DBDirectory: dir, + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) @@ -184,7 +201,8 @@ func TestLazyInitCreatesFileOnFirstWrite(t *testing.T) { dir := t.TempDir() store, err := NewStore(StoreConfig{ - DBDirectory: dir, + DBDirectory: dir, + DisableTxIndexLookup: true, }) require.NoError(t, err) t.Cleanup(func() { _ = store.Close() }) diff --git a/sei-db/ledger_db/parquet/wal.go b/sei-db/ledger_db/parquet/wal.go index 24a4729332..656414c652 100644 --- a/sei-db/ledger_db/parquet/wal.go +++ b/sei-db/ledger_db/parquet/wal.go @@ -113,6 +113,9 @@ func NewWAL(dir string) (dbwal.GenericWAL[WALEntry], error) { encodeWALEntry, decodeWALEntry, dir, - dbwal.Config{}, + dbwal.Config{ + // Allow the WAL to be fully emptied after rotation/truncation. + AllowEmpty: true, + }, ) } diff --git a/sei-db/ledger_db/parquet/wal_test.go b/sei-db/ledger_db/parquet/wal_test.go new file mode 100644 index 0000000000..842633a0b6 --- /dev/null +++ b/sei-db/ledger_db/parquet/wal_test.go @@ -0,0 +1,136 @@ +package parquet + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +// walDirSize returns the total size of all files in dir (non-recursive). +func walDirSize(t *testing.T, dir string) int64 { + t.Helper() + entries, err := os.ReadDir(dir) + if os.IsNotExist(err) { + return 0 + } + require.NoError(t, err) + var total int64 + for _, e := range entries { + if e.IsDir() { + continue + } + info, err := e.Info() + require.NoError(t, err) + total += info.Size() + } + return total +} + +// TestClearWALActuallyFreesSpace uses the real tidwall/wal-backed WAL (not a +// mock) to verify that ClearWAL genuinely removes data from disk. This catches +// the bug where AllowEmpty=false caused TruncateFront to silently fail with +// ErrOutOfRange, leaving every WAL entry on disk forever. +func TestClearWALActuallyFreesSpace(t *testing.T) { + dir := t.TempDir() + + store, err := NewStore(StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 10, + DisableTxIndexLookup: true, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + receipt := ReceiptRecord{ + TxHash: make([]byte, 32), + BlockNumber: 1, + ReceiptBytes: make([]byte, 512), + } + + // Write 10 blocks (fills one file, no rotation yet). + for block := uint64(1); block <= 10; block++ { + receipt.BlockNumber = block + require.NoError(t, store.WriteReceipts([]ReceiptInput{{ + BlockNumber: block, + Receipt: receipt, + ReceiptBytes: receipt.ReceiptBytes, + }})) + } + + walDir := filepath.Join(dir, "parquet-wal") + sizeBeforeRotation := walDirSize(t, walDir) + require.Greater(t, sizeBeforeRotation, int64(0), "WAL should have data before rotation") + + // Block 11 triggers rotation (blocksInFile=10 >= MaxBlocksPerFile=10), + // which calls ClearWAL(). + receipt.BlockNumber = 11 + require.NoError(t, store.WriteReceipts([]ReceiptInput{{ + BlockNumber: 11, + Receipt: receipt, + ReceiptBytes: receipt.ReceiptBytes, + }})) + + sizeAfterRotation := walDirSize(t, walDir) + + // After ClearWAL the WAL should contain at most the single entry from + // block 11. The pre-rotation data (blocks 1-10) must be gone. + require.Less(t, sizeAfterRotation, sizeBeforeRotation, + "WAL should shrink after rotation; ClearWAL may not be truncating (AllowEmpty bug)") +} + +// TestClearWALEmptiesAfterMultipleRotations writes enough blocks to trigger +// several file rotations and verifies the WAL stays bounded rather than +// growing monotonically. +func TestClearWALEmptiesAfterMultipleRotations(t *testing.T) { + dir := t.TempDir() + + store, err := NewStore(StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 5, + DisableTxIndexLookup: true, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + receipt := ReceiptRecord{ + TxHash: make([]byte, 32), + BlockNumber: 1, + ReceiptBytes: make([]byte, 1024), + } + + walDir := filepath.Join(dir, "parquet-wal") + + // Write 5 blocks to fill one file (no rotation yet). The WAL should + // hold all 5 entries at this point. + for block := uint64(1); block <= 5; block++ { + receipt.BlockNumber = block + require.NoError(t, store.WriteReceipts([]ReceiptInput{{ + BlockNumber: block, + Receipt: receipt, + ReceiptBytes: receipt.ReceiptBytes, + }})) + } + sizeBeforeAnyRotation := walDirSize(t, walDir) + require.Greater(t, sizeBeforeAnyRotation, int64(0), + "WAL should have data before first rotation") + + // Write 20 more blocks (blocks 6-25) → 4 more rotations. + for block := uint64(6); block <= 25; block++ { + receipt.BlockNumber = block + require.NoError(t, store.WriteReceipts([]ReceiptInput{{ + BlockNumber: block, + Receipt: receipt, + ReceiptBytes: receipt.ReceiptBytes, + }})) + } + + sizeAtEnd := walDirSize(t, walDir) + + // Without truncation the WAL would hold all 25 blocks (~5x the initial + // size). With working truncation it should be no larger than one + // rotation window. + require.Less(t, sizeAtEnd, sizeBeforeAnyRotation, + "WAL should not grow across rotations; ClearWAL is not reclaiming space") +} diff --git a/sei-db/ledger_db/receipt/cached_receipt_store.go b/sei-db/ledger_db/receipt/cached_receipt_store.go index a71cbd1e61..3721e1d566 100644 --- a/sei-db/ledger_db/receipt/cached_receipt_store.go +++ b/sei-db/ledger_db/receipt/cached_receipt_store.go @@ -4,6 +4,7 @@ import ( "bytes" "sort" "sync" + "time" "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -29,9 +30,10 @@ type cachedReceiptStore struct { cacheRotateInterval uint64 cacheNextRotate uint64 cacheMu sync.Mutex + readMetrics ReceiptReadMetrics } -func newCachedReceiptStore(backend ReceiptStore) ReceiptStore { +func newCachedReceiptStore(backend ReceiptStore, metrics ReceiptReadMetrics) ReceiptStore { if backend == nil { return nil } @@ -45,6 +47,7 @@ func newCachedReceiptStore(backend ReceiptStore) ReceiptStore { backend: backend, cache: newLedgerCache(), cacheRotateInterval: interval, + readMetrics: metrics, } if provider, ok := backend.(cacheWarmupProvider); ok { store.cacheReceipts(provider.warmupReceipts()) @@ -52,6 +55,26 @@ func newCachedReceiptStore(backend ReceiptStore) ReceiptStore { return store } +// StableReceiptCacheWindowBlocks returns the near-tip block window that is +// guaranteed to stay in the active write chunk until the next rotation. +func StableReceiptCacheWindowBlocks(store ReceiptStore) uint64 { + cached, ok := store.(*cachedReceiptStore) + if !ok || cached.cacheRotateInterval == 0 { + return 0 + } + return cached.cacheRotateInterval +} + +// EstimatedReceiptCacheWindowBlocks returns the approximate recent block window +// normally served by the in-memory receipt cache (current chunk + previous one). +func EstimatedReceiptCacheWindowBlocks(store ReceiptStore) uint64 { + hotWindow := StableReceiptCacheWindowBlocks(store) + if hotWindow == 0 { + return 0 + } + return hotWindow * uint64(numCacheChunks-1) +} + func (s *cachedReceiptStore) LatestVersion() int64 { return s.backend.LatestVersion() } @@ -65,16 +88,23 @@ func (s *cachedReceiptStore) SetEarliestVersion(version int64) error { } func (s *cachedReceiptStore) GetReceipt(ctx sdk.Context, txHash common.Hash) (*types.Receipt, error) { - if receipt, ok := s.cache.GetReceipt(txHash); ok { + start := time.Now() + receipt, ok := s.cache.GetReceipt(txHash) + s.reportCacheGetDuration(time.Since(start).Seconds()) + if ok { + s.reportCacheHit() return receipt, nil } + s.reportCacheMiss() return s.backend.GetReceipt(ctx, txHash) } func (s *cachedReceiptStore) GetReceiptFromStore(ctx sdk.Context, txHash common.Hash) (*types.Receipt, error) { if receipt, ok := s.cache.GetReceipt(txHash); ok { + s.reportCacheHit() return receipt, nil } + s.reportCacheMiss() return s.backend.GetReceiptFromStore(ctx, txHash) } @@ -90,11 +120,14 @@ func (s *cachedReceiptStore) SetReceipts(ctx sdk.Context, receipts []ReceiptReco // When the cache fully covers the requested range the backend is skipped // entirely, avoiding an unnecessary DuckDB/parquet query for recent blocks. func (s *cachedReceiptStore) FilterLogs(ctx sdk.Context, fromBlock, toBlock uint64, crit filters.FilterCriteria) ([]*ethtypes.Log, error) { + scanStart := time.Now() // Take a single cache snapshot so rotation cannot advance the cache minimum // past the logs we already copied out of the cache. cacheLogs, cacheMin, hasCacheLogs := s.cache.FilterLogsWithMinBlock(fromBlock, toBlock, crit) + s.reportCacheFilterScanDuration(time.Since(scanStart).Seconds()) if hasCacheLogs && fromBlock >= cacheMin { // Cache logs come from map-backed chunks, so direct cache hits need sorting. + s.reportLogFilterCacheHit() sortLogs(cacheLogs) return cacheLogs, nil } @@ -111,10 +144,12 @@ func (s *cachedReceiptStore) FilterLogs(ctx sdk.Context, fromBlock, toBlock uint } if len(cacheLogs) == 0 { + s.reportLogFilterCacheMiss() // ReceiptStore backends are not required to return ordered logs. sortLogs(backendLogs) return backendLogs, nil } + s.reportLogFilterCacheHit() if len(backendLogs) == 0 { sortLogs(cacheLogs) return cacheLogs, nil @@ -230,3 +265,39 @@ func (s *cachedReceiptStore) maybeRotateCacheLocked(blockNumber uint64) { s.cacheNextRotate += s.cacheRotateInterval } } + +func (s *cachedReceiptStore) reportCacheHit() { + if s.readMetrics != nil { + s.readMetrics.ReportReceiptCacheHit() + } +} + +func (s *cachedReceiptStore) reportCacheMiss() { + if s.readMetrics != nil { + s.readMetrics.ReportReceiptCacheMiss() + } +} + +func (s *cachedReceiptStore) reportLogFilterCacheHit() { + if s.readMetrics != nil { + s.readMetrics.ReportLogFilterCacheHit() + } +} + +func (s *cachedReceiptStore) reportLogFilterCacheMiss() { + if s.readMetrics != nil { + s.readMetrics.ReportLogFilterCacheMiss() + } +} + +func (s *cachedReceiptStore) reportCacheFilterScanDuration(seconds float64) { + if s.readMetrics != nil { + s.readMetrics.RecordCacheFilterScanDuration(seconds) + } +} + +func (s *cachedReceiptStore) reportCacheGetDuration(seconds float64) { + if s.readMetrics != nil { + s.readMetrics.RecordCacheGetDuration(seconds) + } +} diff --git a/sei-db/ledger_db/receipt/cached_receipt_store_test.go b/sei-db/ledger_db/receipt/cached_receipt_store_test.go index 808cff1d8f..5a87607d78 100644 --- a/sei-db/ledger_db/receipt/cached_receipt_store_test.go +++ b/sei-db/ledger_db/receipt/cached_receipt_store_test.go @@ -21,6 +21,33 @@ type fakeReceiptBackend struct { lastFilterToBlock uint64 } +type fakeReceiptReadMetrics struct { + cacheHits int + cacheMisses int + logFilterCacheHits int + logFilterCacheMisses int +} + +func (f *fakeReceiptReadMetrics) ReportReceiptCacheHit() { + f.cacheHits++ +} + +func (f *fakeReceiptReadMetrics) ReportReceiptCacheMiss() { + f.cacheMisses++ +} + +func (f *fakeReceiptReadMetrics) ReportLogFilterCacheHit() { + f.logFilterCacheHits++ +} + +func (f *fakeReceiptReadMetrics) ReportLogFilterCacheMiss() { + f.logFilterCacheMisses++ +} + +func (f *fakeReceiptReadMetrics) RecordCacheFilterScanDuration(float64) {} + +func (f *fakeReceiptReadMetrics) RecordCacheGetDuration(float64) {} + func newFakeReceiptBackend() *fakeReceiptBackend { return &fakeReceiptBackend{ receipts: make(map[common.Hash]*types.Receipt), @@ -81,7 +108,7 @@ func (f *fakeReceiptBackend) Close() error { func TestCachedReceiptStoreUsesCacheForReceipt(t *testing.T) { ctx, _ := newTestContext() backend := newFakeReceiptBackend() - store := newCachedReceiptStore(backend) + store := newCachedReceiptStore(backend, nil) txHash := common.HexToHash("0x1") addr := common.HexToAddress("0x100") @@ -100,7 +127,7 @@ func TestCachedReceiptStoreUsesCacheForReceipt(t *testing.T) { func TestCachedReceiptStoreFilterLogsSkipsBackendWhenCacheCovers(t *testing.T) { ctx, _ := newTestContext() backend := newFakeReceiptBackend() - store := newCachedReceiptStore(backend) + store := newCachedReceiptStore(backend, nil) txHash := common.HexToHash("0x2") addr := common.HexToAddress("0x200") @@ -123,7 +150,7 @@ func TestCachedReceiptStoreFilterLogsSkipsBackendWhenCacheCoversGenesis(t *testi ctx, _ := newTestContext() ctx = ctx.WithBlockHeight(0) backend := newFakeReceiptBackend() - store := newCachedReceiptStore(backend) + store := newCachedReceiptStore(backend, nil) txHash := common.HexToHash("0x21") addr := common.HexToAddress("0x201") @@ -150,7 +177,7 @@ func TestCachedReceiptStoreFilterLogsReturnsSortedLogs(t *testing.T) { {BlockNumber: 8, TxIndex: 1, Index: 2}, {BlockNumber: 5, TxIndex: 0, Index: 0}, } - store := newCachedReceiptStore(backend) + store := newCachedReceiptStore(backend, nil) // Cache holds recent blocks written through SetReceipts. receiptA := makeTestReceipt(common.HexToHash("0xa"), 11, 1, common.HexToAddress("0x210"), []common.Hash{common.HexToHash("0x1")}) @@ -178,7 +205,7 @@ func TestFilterLogsPartialCacheNarrowsBackendRange(t *testing.T) { {BlockNumber: 5, TxIndex: 0, Index: 0}, {BlockNumber: 8, TxIndex: 0, Index: 0}, } - store := newCachedReceiptStore(backend) + store := newCachedReceiptStore(backend, nil) receipt10 := makeTestReceipt(common.HexToHash("0xa"), 10, 0, common.HexToAddress("0x1"), nil) receipt11 := makeTestReceipt(common.HexToHash("0xb"), 11, 0, common.HexToAddress("0x2"), nil) @@ -208,7 +235,7 @@ func TestFilterLogsPartialCacheNarrowsBackendRangeAcrossRotatedChunks(t *testing {BlockNumber: 5, TxIndex: 0, Index: 0}, {BlockNumber: 9, TxIndex: 0, Index: 0}, } - store := newCachedReceiptStore(backend).(*cachedReceiptStore) + store := newCachedReceiptStore(backend, nil).(*cachedReceiptStore) receipt10 := makeTestReceipt(common.HexToHash("0xc"), 10, 0, common.HexToAddress("0x1"), nil) require.NoError(t, store.SetReceipts(ctx, []ReceiptRecord{ @@ -245,7 +272,7 @@ func TestFilterLogsFallsBackToBackendWhenCacheEmpty(t *testing.T) { {BlockNumber: 2, TxIndex: 0, Index: 0}, {BlockNumber: 1, TxIndex: 0, Index: 0}, } - store := newCachedReceiptStore(backend) + store := newCachedReceiptStore(backend, nil) backend.filterLogCalls = 0 logs, err := store.FilterLogs(ctx, 1, 5, filters.FilterCriteria{}) @@ -261,7 +288,7 @@ func TestFilterLogsFallsBackToBackendWhenCacheEmpty(t *testing.T) { func TestFilterLogsMultipleBlocksCacheOnly(t *testing.T) { ctx, _ := newTestContext() backend := newFakeReceiptBackend() - store := newCachedReceiptStore(backend) + store := newCachedReceiptStore(backend, nil) for block := uint64(100); block <= 105; block++ { txHash := common.BigToHash(new(big.Int).SetUint64(block)) @@ -278,3 +305,36 @@ func TestFilterLogsMultipleBlocksCacheOnly(t *testing.T) { require.Equal(t, blockNum, logs[i].BlockNumber) } } + +func TestCachedReceiptStoreReportsCacheHit(t *testing.T) { + ctx, _ := newTestContext() + backend := newFakeReceiptBackend() + metrics := &fakeReceiptReadMetrics{} + store := newCachedReceiptStore(backend, metrics) + + txHash := common.HexToHash("0x10") + receipt := makeTestReceipt(txHash, 7, 1, common.HexToAddress("0x100"), nil) + + require.NoError(t, store.SetReceipts(ctx, []ReceiptRecord{{TxHash: txHash, Receipt: receipt}})) + + backend.getReceiptCalls = 0 + got, err := store.GetReceipt(ctx, txHash) + require.NoError(t, err) + require.Equal(t, receipt.TxHashHex, got.TxHashHex) + require.Equal(t, 0, backend.getReceiptCalls) + require.Equal(t, 1, metrics.cacheHits) + require.Equal(t, 0, metrics.cacheMisses) +} + +func TestCachedReceiptStoreReportsCacheMiss(t *testing.T) { + ctx, _ := newTestContext() + backend := newFakeReceiptBackend() + metrics := &fakeReceiptReadMetrics{} + store := newCachedReceiptStore(backend, metrics) + + _, err := store.GetReceipt(ctx, common.HexToHash("0x404")) + require.ErrorIs(t, err, ErrNotFound) + require.Equal(t, 1, backend.getReceiptCalls) + require.Equal(t, 0, metrics.cacheHits) + require.Equal(t, 1, metrics.cacheMisses) +} diff --git a/sei-db/ledger_db/receipt/parquet_store.go b/sei-db/ledger_db/receipt/parquet_store.go index bdf88b2fd0..cf03fff495 100644 --- a/sei-db/ledger_db/receipt/parquet_store.go +++ b/sei-db/ledger_db/receipt/parquet_store.go @@ -24,6 +24,7 @@ func newParquetReceiptStore(cfg dbconfig.ReceiptStoreConfig, storeKey sdk.StoreK DBDirectory: cfg.DBDirectory, KeepRecent: int64(cfg.KeepRecent), PruneIntervalSeconds: int64(cfg.PruneIntervalSeconds), + DisableTxIndexLookup: cfg.DisableTxIndexLookup, } store, err := parquet.NewStore(storeCfg) @@ -90,6 +91,9 @@ func (s *parquetReceiptStore) GetReceipt(ctx sdk.Context, txHash common.Hash) (* return receipt, nil } + if s.storeKey == nil { + return nil, ErrNotFound + } store := ctx.KVStore(s.storeKey) bz := store.Get(types.ReceiptKey(txHash)) if bz == nil { diff --git a/sei-db/ledger_db/receipt/receipt_store.go b/sei-db/ledger_db/receipt/receipt_store.go index 9ce457765b..022b9dceeb 100644 --- a/sei-db/ledger_db/receipt/receipt_store.go +++ b/sei-db/ledger_db/receipt/receipt_store.go @@ -52,6 +52,17 @@ type ReceiptRecord struct { ReceiptBytes []byte // Optional pre-marshaled receipt (must match Receipt if set) } +// ReceiptReadMetrics records cache hits, misses, and timing for cached receipt +// and log reads. +type ReceiptReadMetrics interface { + ReportReceiptCacheHit() + ReportReceiptCacheMiss() + ReportLogFilterCacheHit() + ReportLogFilterCacheMiss() + RecordCacheFilterScanDuration(seconds float64) + RecordCacheGetDuration(seconds float64) +} + type receiptStore struct { db seidbtypes.StateStore storeKey sdk.StoreKey @@ -77,11 +88,21 @@ func normalizeReceiptBackend(backend string) string { } func NewReceiptStore(config dbconfig.ReceiptStoreConfig, storeKey sdk.StoreKey) (ReceiptStore, error) { + return NewReceiptStoreWithReadMetrics(config, storeKey, nil) +} + +// NewReceiptStoreWithReadMetrics constructs a receipt store and optionally +// records cache hits, misses, and timings for cached receipt/log reads. +func NewReceiptStoreWithReadMetrics( + config dbconfig.ReceiptStoreConfig, + storeKey sdk.StoreKey, + metrics ReceiptReadMetrics, +) (ReceiptStore, error) { backend, err := newReceiptBackend(config, storeKey) if err != nil { return nil, err } - return newCachedReceiptStore(backend), nil + return newCachedReceiptStore(backend, metrics), nil } // BackendTypeName returns the backend implementation name ("parquet" or "pebble") for testing. diff --git a/sei-db/state_db/bench/cryptosim/canned_random.go b/sei-db/state_db/bench/cryptosim/canned_random.go index 6e95db15e4..7e0e37cba5 100644 --- a/sei-db/state_db/bench/cryptosim/canned_random.go +++ b/sei-db/state_db/bench/cryptosim/canned_random.go @@ -91,6 +91,9 @@ func (cr *CannedRandom) Bytes(count int) []byte { // Returns a slice of random bytes from a given seed. Bytes are deterministic given the same seed. // +// Unlike most CannedRandom methods, SeededBytes is safe for concurrent use: it only reads +// from the immutable buffer and does not advance the internal index. +// // Returned slice is NOT safe to modify. If modification is required, the caller should make a copy of the slice. func (cr *CannedRandom) SeededBytes(count int, seed int64) []byte { if count < 0 { diff --git a/sei-db/state_db/bench/cryptosim/config/basic-config.json b/sei-db/state_db/bench/cryptosim/config/basic-config.json index a867898d84..caa3cf5d3e 100644 --- a/sei-db/state_db/bench/cryptosim/config/basic-config.json +++ b/sei-db/state_db/bench/cryptosim/config/basic-config.json @@ -29,6 +29,7 @@ "DeleteLogDirOnStartup": false, "DeleteDataDirOnShutdown": false, "DeleteLogDirOnShutdown": false, + "DisableReceiptTxIndexLookup": true, "ExecutorQueueSize": 1024, "HotAccountProbability": 0.1, "HotErc20ContractProbability": 0.5, diff --git a/sei-db/state_db/bench/cryptosim/config/reciept-store.json b/sei-db/state_db/bench/cryptosim/config/reciept-store.json index ac2801ec8e..acbf2dd468 100644 --- a/sei-db/state_db/bench/cryptosim/config/reciept-store.json +++ b/sei-db/state_db/bench/cryptosim/config/reciept-store.json @@ -1,9 +1,21 @@ { - "Comment": "For testing with the state store and reciept store both enabled.", + "Comment": "For receipt-read-focused benchmarking against the parquet receipt store with tx-index lookup disabled. Log-filter reads are disabled to avoid competing with receipt reads.", + "DisableTransactionExecution": true, "DataDir": "data", "LogDir": "logs", + "LogLevel": "info", "MinimumNumberOfColdAccounts": 1000000, "MinimumNumberOfDormantAccounts": 1000000, - "GenerateReceipts": true + "GenerateReceipts": true, + "ReceiptReadConcurrency": 16, + "ReceiptReadsPerSecond": 500000, + "ReceiptReadMode": "duckdb", + "DisableReceiptTxIndexLookup": true, + "ReceiptKeepRecent": 100000, + "ReceiptPruneIntervalSeconds": 60, + "ReceiptLogFilterReadConcurrency": 0, + "ReceiptLogFilterReadsPerSecond": 0, + "ReceiptLogFilterReadMode": "cache", + "ReceiptLogFilterMinBlockRange": 1, + "ReceiptLogFilterMaxBlockRange": 10 } - diff --git a/sei-db/state_db/bench/cryptosim/cryptosim.go b/sei-db/state_db/bench/cryptosim/cryptosim.go index e0c8666990..55fb7502c5 100644 --- a/sei-db/state_db/bench/cryptosim/cryptosim.go +++ b/sei-db/state_db/bench/cryptosim/cryptosim.go @@ -184,7 +184,7 @@ func NewCryptoSim( var recieptsChan chan *block if config.GenerateReceipts { recieptsChan = make(chan *block, config.RecieptChannelCapacity) - _, err := NewRecieptStoreSimulator(ctx, config, recieptsChan, metrics) + _, err := NewRecieptStoreSimulator(ctx, config, recieptsChan, metrics, rand.Clone(false)) if err != nil { cancel() return nil, fmt.Errorf("failed to create receipt store simulator: %w", err) @@ -442,6 +442,8 @@ func (c *CryptoSim) handleNextBlock(blk *block) { c.database.IncrementTransactionCount() } + // TODO: skip executor dispatch and FinalizeBlock when DisableTransactionExecution + // is true and only receipts are being benchmarked. FlatKV commits waste I/O here. for txn := range blk.Iterator() { c.executors[c.nextExecutorIndex].ScheduleForExecution(txn) c.nextExecutorIndex = (c.nextExecutorIndex + 1) % len(c.executors) diff --git a/sei-db/state_db/bench/cryptosim/cryptosim_config.go b/sei-db/state_db/bench/cryptosim/cryptosim_config.go index 57c748a232..cd06e077d3 100644 --- a/sei-db/state_db/bench/cryptosim/cryptosim_config.go +++ b/sei-db/state_db/bench/cryptosim/cryptosim_config.go @@ -16,6 +16,8 @@ const ( minPaddedAccountSize = 8 minErc20StorageSlotSize = 32 minErc20InteractionsPerAcct = 1 + receiptReadModeCache = "cache" + receiptReadModeDuckDB = "duckdb" ) // Defines the configuration for the cryptosim benchmark. @@ -189,6 +191,43 @@ type CryptoSimConfig struct { // If greater than 0, the benchmark will throttle the transaction rate to this value, in hertz. MaxTPS float64 + // Number of concurrent reader goroutines issuing receipt lookups. 0 disables reads. + ReceiptReadConcurrency int + + // Target total receipt reads per second across all reader goroutines. + // Reads are distributed evenly across readers. + ReceiptReadsPerSecond int + + // Controls which block range receipt-by-hash reads target. + // "cache" = only read receipts in the cache window (guaranteed cache hit). + // "duckdb" = only read receipts older than the cache window (guaranteed cache miss, DuckDB fallback). + // Required when ReceiptReadConcurrency > 0. + ReceiptReadMode string + + // Must remain true. The parquet tx_hash -> block_number lookup path is + // intentionally unsupported; setting this to false will panic during receipt + // store initialization. + DisableReceiptTxIndexLookup bool + + // Number of concurrent goroutines issuing log filter (eth_getLogs) queries. 0 disables log filter reads. + // These goroutines are independent from the receipt reader goroutines. + ReceiptLogFilterReadConcurrency int + + // Target total log filter reads per second across all log filter goroutines. + ReceiptLogFilterReadsPerSecond int + + // Controls which block range log filter reads target. + // "cache" = only query blocks in the cache window (DuckDB skipped). + // "duckdb" = only query blocks older than the cache window (cache returns nothing). + // Required when ReceiptLogFilterReadConcurrency > 0. + ReceiptLogFilterReadMode string + + // Minimum number of blocks in a log filter query range. Default 1. + ReceiptLogFilterMinBlockRange int + + // Maximum number of blocks in a log filter query range. Default 10. + ReceiptLogFilterMaxBlockRange int + // Number of recent blocks to keep before pruning parquet files. 0 disables pruning. ReceiptKeepRecent int64 @@ -253,6 +292,15 @@ func DefaultCryptoSimConfig() *CryptoSimConfig { DisableTransactionExecution: false, DisableTransactionReads: false, MaxTPS: 0, + ReceiptReadConcurrency: 0, + ReceiptReadsPerSecond: 100, + ReceiptReadMode: receiptReadModeCache, + DisableReceiptTxIndexLookup: true, + ReceiptLogFilterReadConcurrency: 0, + ReceiptLogFilterReadsPerSecond: 100, + ReceiptLogFilterReadMode: receiptReadModeCache, + ReceiptLogFilterMinBlockRange: 1, + ReceiptLogFilterMaxBlockRange: 10, ReceiptKeepRecent: 100_000, ReceiptPruneIntervalSeconds: 600, LogLevel: "info", @@ -342,6 +390,38 @@ func (c *CryptoSimConfig) Validate() error { if c.MaxTPS < 0 { return fmt.Errorf("MaxTPS must be non-negative (got %f)", c.MaxTPS) } + if c.ReceiptReadConcurrency < 0 { + return fmt.Errorf("ReceiptReadConcurrency must be non-negative (got %d)", c.ReceiptReadConcurrency) + } + if c.ReceiptReadConcurrency > 0 { + switch c.ReceiptReadMode { + case receiptReadModeCache, receiptReadModeDuckDB: + default: + return fmt.Errorf("ReceiptReadMode must be %q or %q (got %q)", + receiptReadModeCache, receiptReadModeDuckDB, c.ReceiptReadMode) + } + } + if c.ReceiptLogFilterReadConcurrency < 0 { + return fmt.Errorf("ReceiptLogFilterReadConcurrency must be non-negative (got %d)", c.ReceiptLogFilterReadConcurrency) + } + if c.ReceiptLogFilterReadConcurrency > 0 { + switch c.ReceiptLogFilterReadMode { + case receiptReadModeCache, receiptReadModeDuckDB: + default: + return fmt.Errorf("ReceiptLogFilterReadMode must be %q or %q (got %q)", + receiptReadModeCache, receiptReadModeDuckDB, c.ReceiptLogFilterReadMode) + } + } + if c.ReceiptLogFilterMinBlockRange < 1 { + return fmt.Errorf("ReceiptLogFilterMinBlockRange must be at least 1 (got %d)", c.ReceiptLogFilterMinBlockRange) + } + if c.ReceiptLogFilterMaxBlockRange < c.ReceiptLogFilterMinBlockRange { + return fmt.Errorf("ReceiptLogFilterMaxBlockRange must be >= ReceiptLogFilterMinBlockRange (got %d < %d)", + c.ReceiptLogFilterMaxBlockRange, c.ReceiptLogFilterMinBlockRange) + } + if c.StateStoreConfig == nil { + return fmt.Errorf("StateStoreConfig is required") + } switch c.StateStoreConfig.Backend { case config.PebbleDBBackend, config.RocksDBBackend: default: @@ -364,7 +444,6 @@ func (c *CryptoSimConfig) Validate() error { default: return fmt.Errorf("LogLevel must be one of debug, info, warn, error (got %q)", c.LogLevel) } - return nil } diff --git a/sei-db/state_db/bench/cryptosim/cryptosim_config_test.go b/sei-db/state_db/bench/cryptosim/cryptosim_config_test.go index 64ca64f37f..ad426f97eb 100644 --- a/sei-db/state_db/bench/cryptosim/cryptosim_config_test.go +++ b/sei-db/state_db/bench/cryptosim/cryptosim_config_test.go @@ -68,3 +68,19 @@ func TestLoadConfigFromFile_DisableTransactionReadsOverride(t *testing.T) { require.Equal(t, wrappers.NoOp, cfg.Backend) require.True(t, cfg.DisableTransactionReads) } + +func TestLoadConfigFromFile_DisableReceiptTxIndexLookupOverride(t *testing.T) { + t.Parallel() + + configPath := filepath.Join(t.TempDir(), "cryptosim.json") + err := os.WriteFile(configPath, []byte(`{ + "DisableReceiptTxIndexLookup": true, + "DataDir": "data", + "LogDir": "logs" +}`), 0o600) + require.NoError(t, err) + + cfg, err := LoadConfigFromFile(configPath) + require.NoError(t, err) + require.True(t, cfg.DisableReceiptTxIndexLookup) +} diff --git a/sei-db/state_db/bench/cryptosim/cryptosim_metrics.go b/sei-db/state_db/bench/cryptosim/cryptosim_metrics.go index 050a8f6772..8e9f4b6719 100644 --- a/sei-db/state_db/bench/cryptosim/cryptosim_metrics.go +++ b/sei-db/state_db/bench/cryptosim/cryptosim_metrics.go @@ -25,6 +25,20 @@ var receiptWriteLatencyBuckets = []float64{ 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, } +var receiptReadLatencyBuckets = []float64{ + 0.00001, 0.00005, 0.0001, 0.00025, 0.0005, + 0.001, 0.0025, 0.005, 0.01, 0.025, + 0.05, 0.1, 0.25, 0.5, 1, +} + +var receiptLogFilterLatencyBuckets = []float64{ + 0.00001, 0.00005, 0.0001, 0.00025, 0.0005, + 0.001, 0.0025, 0.005, 0.01, 0.025, + 0.05, 0.075, 0.1, 0.15, 0.25, + 0.5, 0.75, 1, 1.5, 2, + 2.5, 3, 4, 5, 7.5, 10, +} + // CryptosimMetrics holds OpenTelemetry metrics for the cryptosim benchmark. // Metrics are exported via whatever exporter is configured on the global OTel // MeterProvider (e.g., Prometheus, OTLP). This package does not import Prometheus. @@ -49,10 +63,22 @@ type CryptosimMetrics struct { uptimeSeconds metric.Float64Gauge // Receipt metrics - receiptBlockWriteDuration metric.Float64Histogram - receiptChannelDepth metric.Int64Gauge - receiptsWrittenTotal metric.Int64Counter - receiptErrorsTotal metric.Int64Counter + receiptBlockWriteDuration metric.Float64Histogram + receiptChannelDepth metric.Int64Gauge + receiptsWrittenTotal metric.Int64Counter + receiptErrorsTotal metric.Int64Counter + receiptReadDuration metric.Float64Histogram + receiptReadsTotal metric.Int64Counter + receiptCacheHitsTotal metric.Int64Counter + receiptCacheMissesTotal metric.Int64Counter + receiptReadsFoundTotal metric.Int64Counter + receiptReadsNotFoundTotal metric.Int64Counter + receiptLogFilterDuration metric.Float64Histogram + receiptLogFilterCacheHitsTotal metric.Int64Counter + receiptLogFilterCacheMissTotal metric.Int64Counter + receiptLogFilterLogsReturned metric.Int64Histogram + cacheFilterScanDuration metric.Float64Histogram + cacheGetDuration metric.Float64Histogram mainThreadPhase *metrics.PhaseTimer transactionPhaseTimerFactory *metrics.PhaseTimerFactory @@ -179,6 +205,72 @@ func NewCryptosimMetrics( metric.WithDescription("Total receipt processing errors (marshal or write failures)"), metric.WithUnit("{count}"), ) + receiptReadDuration, _ := meter.Float64Histogram( + "cryptosim_receipt_read_duration_seconds", + metric.WithDescription("End-to-end receipt read latency (includes cache layer)"), + metric.WithExplicitBucketBoundaries(receiptReadLatencyBuckets...), + metric.WithUnit("s"), + ) + receiptReadsTotal, _ := meter.Int64Counter( + "cryptosim_receipt_reads_total", + metric.WithDescription("Total receipt read attempts"), + metric.WithUnit("{count}"), + ) + receiptCacheHitsTotal, _ := meter.Int64Counter( + "cryptosim_receipt_cache_hits_total", + metric.WithDescription("Receipt reads served from the ledger cache"), + metric.WithUnit("{count}"), + ) + receiptCacheMissesTotal, _ := meter.Int64Counter( + "cryptosim_receipt_cache_misses_total", + metric.WithDescription("Receipt reads that missed the in-memory ledger cache and fell through to the backend"), + metric.WithUnit("{count}"), + ) + receiptReadsFoundTotal, _ := meter.Int64Counter( + "cryptosim_receipt_reads_found_total", + metric.WithDescription("Receipt reads that returned a receipt"), + metric.WithUnit("{count}"), + ) + receiptReadsNotFoundTotal, _ := meter.Int64Counter( + "cryptosim_receipt_reads_not_found_total", + metric.WithDescription("Receipt reads that returned no receipt because the hash was absent or pruned"), + metric.WithUnit("{count}"), + ) + receiptLogFilterDuration, _ := meter.Float64Histogram( + "cryptosim_receipt_log_filter_duration_seconds", + metric.WithDescription("DuckDB eth_getLogs filter query latency"), + metric.WithExplicitBucketBoundaries(receiptLogFilterLatencyBuckets...), + metric.WithUnit("s"), + ) + receiptLogFilterCacheHitsTotal, _ := meter.Int64Counter( + "cryptosim_receipt_log_filter_cache_hits_total", + metric.WithDescription("Log filter queries where the in-memory cache contributed results"), + metric.WithUnit("{count}"), + ) + receiptLogFilterCacheMissTotal, _ := meter.Int64Counter( + "cryptosim_receipt_log_filter_cache_miss_total", + metric.WithDescription("Log filter queries served entirely from the backend (cache contributed nothing)"), + metric.WithUnit("{count}"), + ) + receiptLogFilterLogsReturned, _ := meter.Int64Histogram( + "cryptosim_receipt_log_filter_logs_returned", + metric.WithDescription("Number of log entries returned per FilterLogs query"), + metric.WithExplicitBucketBoundaries(0, 1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000), + metric.WithUnit("{count}"), + ) + + cacheFilterScanDuration, _ := meter.Float64Histogram( + "cryptosim_receipt_cache_filter_scan_duration_seconds", + metric.WithDescription("Time spent scanning the in-memory log cache during FilterLogs (excludes backend)"), + metric.WithExplicitBucketBoundaries(receiptReadLatencyBuckets...), + metric.WithUnit("s"), + ) + cacheGetDuration, _ := meter.Float64Histogram( + "cryptosim_receipt_cache_get_duration_seconds", + metric.WithDescription("Time spent in cache.GetReceipt (includes clone cost, excludes backend)"), + metric.WithExplicitBucketBoundaries(receiptReadLatencyBuckets...), + metric.WithUnit("s"), + ) mainThreadPhase := dbPhaseTimer if mainThreadPhase == nil { @@ -188,29 +280,41 @@ func NewCryptosimMetrics( transactionPhaseTimerFactory := metrics.NewPhaseTimerFactory(meter, "transaction") m := &CryptosimMetrics{ - ctx: ctx, - blocksFinalizedTotal: blocksFinalizedTotal, - transactionsProcessedTotal: transactionsProcessedTotal, - totalAccounts: totalAccounts, - hotAccounts: hotAccounts, - coldAccounts: coldAccounts, - dormantAccounts: dormantAccounts, - totalErc20Contracts: totalErc20Contracts, - dbCommitsTotal: dbCommitsTotal, - dataDirSizeBytes: dataDirSizeBytes, - dataDirAvailableBytes: dataDirAvailableBytes, - logDirSizeBytes: logDirSizeBytes, - processReadBytesTotal: processReadBytesTotal, - processWriteBytesTotal: processWriteBytesTotal, - processReadCountTotal: processReadCountTotal, - processWriteCountTotal: processWriteCountTotal, - uptimeSeconds: uptimeSeconds, - receiptBlockWriteDuration: receiptBlockWriteDuration, - receiptChannelDepth: receiptChannelDepth, - receiptsWrittenTotal: receiptsWrittenTotal, - receiptErrorsTotal: receiptErrorsTotal, - mainThreadPhase: mainThreadPhase, - transactionPhaseTimerFactory: transactionPhaseTimerFactory, + ctx: ctx, + blocksFinalizedTotal: blocksFinalizedTotal, + transactionsProcessedTotal: transactionsProcessedTotal, + totalAccounts: totalAccounts, + hotAccounts: hotAccounts, + coldAccounts: coldAccounts, + dormantAccounts: dormantAccounts, + totalErc20Contracts: totalErc20Contracts, + dbCommitsTotal: dbCommitsTotal, + dataDirSizeBytes: dataDirSizeBytes, + dataDirAvailableBytes: dataDirAvailableBytes, + logDirSizeBytes: logDirSizeBytes, + processReadBytesTotal: processReadBytesTotal, + processWriteBytesTotal: processWriteBytesTotal, + processReadCountTotal: processReadCountTotal, + processWriteCountTotal: processWriteCountTotal, + uptimeSeconds: uptimeSeconds, + receiptBlockWriteDuration: receiptBlockWriteDuration, + receiptChannelDepth: receiptChannelDepth, + receiptsWrittenTotal: receiptsWrittenTotal, + receiptErrorsTotal: receiptErrorsTotal, + receiptReadDuration: receiptReadDuration, + receiptReadsTotal: receiptReadsTotal, + receiptCacheHitsTotal: receiptCacheHitsTotal, + receiptCacheMissesTotal: receiptCacheMissesTotal, + receiptReadsFoundTotal: receiptReadsFoundTotal, + receiptReadsNotFoundTotal: receiptReadsNotFoundTotal, + receiptLogFilterDuration: receiptLogFilterDuration, + receiptLogFilterCacheHitsTotal: receiptLogFilterCacheHitsTotal, + receiptLogFilterCacheMissTotal: receiptLogFilterCacheMissTotal, + receiptLogFilterLogsReturned: receiptLogFilterLogsReturned, + cacheFilterScanDuration: cacheFilterScanDuration, + cacheGetDuration: cacheGetDuration, + mainThreadPhase: mainThreadPhase, + transactionPhaseTimerFactory: transactionPhaseTimerFactory, } if config.BackgroundMetricsScrapeInterval > 0 { @@ -484,6 +588,90 @@ func (m *CryptosimMetrics) ReportReceiptError() { m.receiptErrorsTotal.Add(context.Background(), 1) } +func (m *CryptosimMetrics) RecordReceiptReadDuration(seconds float64) { + if m == nil || m.receiptReadDuration == nil { + return + } + m.receiptReadDuration.Record(context.Background(), seconds) +} + +func (m *CryptosimMetrics) ReportReceiptRead() { + if m == nil || m.receiptReadsTotal == nil { + return + } + m.receiptReadsTotal.Add(context.Background(), 1) +} + +func (m *CryptosimMetrics) ReportReceiptCacheHit() { + if m == nil || m.receiptCacheHitsTotal == nil { + return + } + m.receiptCacheHitsTotal.Add(context.Background(), 1) +} + +func (m *CryptosimMetrics) ReportReceiptCacheMiss() { + if m == nil || m.receiptCacheMissesTotal == nil { + return + } + m.receiptCacheMissesTotal.Add(context.Background(), 1) +} + +func (m *CryptosimMetrics) ReportReceiptReadFound() { + if m == nil || m.receiptReadsFoundTotal == nil { + return + } + m.receiptReadsFoundTotal.Add(context.Background(), 1) +} + +func (m *CryptosimMetrics) ReportReceiptReadNotFound() { + if m == nil || m.receiptReadsNotFoundTotal == nil { + return + } + m.receiptReadsNotFoundTotal.Add(context.Background(), 1) +} + +func (m *CryptosimMetrics) RecordReceiptLogFilterDuration(seconds float64) { + if m == nil || m.receiptLogFilterDuration == nil { + return + } + m.receiptLogFilterDuration.Record(context.Background(), seconds) +} + +func (m *CryptosimMetrics) ReportLogFilterCacheHit() { + if m == nil || m.receiptLogFilterCacheHitsTotal == nil { + return + } + m.receiptLogFilterCacheHitsTotal.Add(context.Background(), 1) +} + +func (m *CryptosimMetrics) ReportLogFilterCacheMiss() { + if m == nil || m.receiptLogFilterCacheMissTotal == nil { + return + } + m.receiptLogFilterCacheMissTotal.Add(context.Background(), 1) +} + +func (m *CryptosimMetrics) RecordLogFilterLogsReturned(count int64) { + if m == nil || m.receiptLogFilterLogsReturned == nil { + return + } + m.receiptLogFilterLogsReturned.Record(context.Background(), count) +} + +func (m *CryptosimMetrics) RecordCacheFilterScanDuration(seconds float64) { + if m == nil || m.cacheFilterScanDuration == nil { + return + } + m.cacheFilterScanDuration.Record(context.Background(), seconds) +} + +func (m *CryptosimMetrics) RecordCacheGetDuration(seconds float64) { + if m == nil || m.cacheGetDuration == nil { + return + } + m.cacheGetDuration.Record(context.Background(), seconds) +} + // startReceiptChannelDepthSampling periodically records the depth of the receipt channel. func (m *CryptosimMetrics) startReceiptChannelDepthSampling(ch <-chan *block, intervalSeconds int) { if m == nil || m.receiptChannelDepth == nil || intervalSeconds <= 0 || ch == nil { diff --git a/sei-db/state_db/bench/cryptosim/receipt.go b/sei-db/state_db/bench/cryptosim/receipt.go index 4ab348aa2c..60394b1086 100644 --- a/sei-db/state_db/bench/cryptosim/receipt.go +++ b/sei-db/state_db/bench/cryptosim/receipt.go @@ -32,6 +32,11 @@ const ( syntheticReceiptGasPriceSpan uint64 = 9_000_000_000 syntheticReceiptTransferBase uint64 = 1_000_000 syntheticReceiptTransferSpan uint64 = 10_000_000_000 + + // Multiplied by blockNumber then added to txIndex to produce a unique seed per + // transaction. Supports up to 1M txs per block before collisions. With int64, + // block numbers up to ~9.2 trillion are safe before overflow (~290k years at 1 block/sec). + syntheticTxIDBlockStride int64 = 1_000_000 ) var erc20TransferEventSignatureBytes = [hashLen]byte{ @@ -41,6 +46,27 @@ var erc20TransferEventSignatureBytes = [hashLen]byte{ 0x28, 0xf5, 0x5a, 0x4d, 0xf5, 0x23, 0xb3, 0xef, } +// SyntheticTxHash returns a deterministic 32-byte tx hash for a given (blockNumber, txIndex) pair. +// +// It uses CannedRandom.SeededBytes, which is a pure read from the pre-generated buffer — no +// internal state is advanced, and the result depends only on the CannedRandom's seed/buffer +// and the inputs. This means any goroutine with a CannedRandom created from the same +// (seed, bufferSize) can reconstruct any tx hash from just the block number and tx index, +// without storing the hashes. Readers use this to compute query targets on the fly: +// +// validRange = [max(1, latestBlock - keepRecent + 1), latestBlock] +// randomBlock = pick from validRange +// randomTxIdx = pick from [0, txsPerBlock) +// txHash = SyntheticTxHash(crand, randomBlock, randomTxIdx) +// +// The hash automatically becomes invalid (returns no result) once the corresponding +// parquet file is pruned, so readers never need to track which hashes are live. +func SyntheticTxHash(crand *CannedRandom, blockNumber uint64, txIndex uint32) []byte { + //nolint:gosec // block numbers and tx indices won't exceed int64 in benchmarks + txID := int64(blockNumber)*syntheticTxIDBlockStride + int64(txIndex) + return crand.SeededBytes(hashLen, txID) +} + // BuildERC20TransferReceiptFromTxn produces a plausible successful ERC20 transfer receipt from a transaction. func BuildERC20TransferReceiptFromTxn( crand *CannedRandom, @@ -137,7 +163,7 @@ func BuildERC20TransferReceipt( TxType: txType, CumulativeGasUsed: cumulativeGasUsed, ContractAddress: contractAddressHex, - TxHashHex: BytesToHex(crand.Bytes(hashLen)), + TxHashHex: BytesToHex(SyntheticTxHash(crand, blockNumber, txIndex)), GasUsed: gasUsed, EffectiveGasPrice: effectiveGasPrice, BlockNumber: blockNumber, diff --git a/sei-db/state_db/bench/cryptosim/receipt_test.go b/sei-db/state_db/bench/cryptosim/receipt_test.go index f1e61109fb..367264c094 100644 --- a/sei-db/state_db/bench/cryptosim/receipt_test.go +++ b/sei-db/state_db/bench/cryptosim/receipt_test.go @@ -82,6 +82,47 @@ func TestBuildERC20TransferReceipt_InvalidInputs(t *testing.T) { } } +func TestSyntheticTxHashDeterminism(t *testing.T) { + crand1 := NewCannedRandom(1<<20, 42) + crand2 := NewCannedRandom(1<<20, 42) + + block := uint64(500_000) + txIdx := uint32(7) + + hash1 := SyntheticTxHash(crand1, block, txIdx) + hash2 := SyntheticTxHash(crand2, block, txIdx) + + if len(hash1) != 32 { + t.Fatalf("expected 32 bytes, got %d", len(hash1)) + } + for i := range hash1 { + if hash1[i] != hash2[i] { + t.Fatal("same (seed, bufferSize, block, txIdx) must produce identical hashes") + } + } + + // Same call again on the same instance must be stable (SeededBytes is stateless). + hash3 := SyntheticTxHash(crand1, block, txIdx) + for i := range hash1 { + if hash1[i] != hash3[i] { + t.Fatal("repeated calls with same inputs must return identical hashes") + } + } + + // Different (block, txIdx) must produce a different hash. + other := SyntheticTxHash(crand1, block, txIdx+1) + same := true + for i := range hash1 { + if hash1[i] != other[i] { + same = false + break + } + } + if same { + t.Fatal("different (block, txIdx) should produce different hashes") + } +} + // Regression test: account keys with EVMKeyCode prefix must still be accepted. func TestBuildERC20TransferReceipt_EVMKeyCodeAccounts(t *testing.T) { crand := NewCannedRandom(1<<20, 42) diff --git a/sei-db/state_db/bench/cryptosim/reciept_store_simulator.go b/sei-db/state_db/bench/cryptosim/reciept_store_simulator.go index 309c03667e..8e48ecb34a 100644 --- a/sei-db/state_db/bench/cryptosim/reciept_store_simulator.go +++ b/sei-db/state_db/bench/cryptosim/reciept_store_simulator.go @@ -4,10 +4,12 @@ import ( "context" "fmt" "path/filepath" + "sync" "time" "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt" @@ -15,8 +17,91 @@ import ( evmtypes "github.com/sei-protocol/sei-chain/x/evm/types" ) -// A simulated receipt store using the real production receipt.ReceiptStore -// (cached parquet backend with WAL, flush, rotation, and pruning). +const ( + // Must be larger than cacheWindow * TransactionsPerBlock so that the + // oldest ring entries have aged past the cache window, enabling duckdb- + // only reads to find targets. With the default cache window of ~1000 + // blocks and 1024 txns/block the minimum is ~1.02M; 3M gives comfortable + // headroom for both cache and duckdb read modes. + defaultTxHashRingSize = 3_000_000 +) + +// txHashEntry stores a written tx hash along with its block and contract address, +// used by reader goroutines to generate realistic log filter queries. +type txHashEntry struct { + txHash common.Hash + blockNumber uint64 + contractAddress common.Address +} + +// txHashRing is a fixed-size ring buffer of recently written tx hashes. +// Writers call Push from the main loop; readers call RandomEntry from goroutines. +type txHashRing struct { + mu sync.RWMutex + entries []txHashEntry + size int + head int + count int +} + +func newTxHashRing(size int) *txHashRing { + return &txHashRing{ + entries: make([]txHashEntry, size), + size: size, + } +} + +// Push appends a tx hash entry to the ring, overwriting the oldest entry when full. +func (r *txHashRing) Push(txHash common.Hash, blockNumber uint64, contractAddress common.Address) { + r.mu.Lock() + defer r.mu.Unlock() + r.entries[r.head] = txHashEntry{ + txHash: txHash, + blockNumber: blockNumber, + contractAddress: contractAddress, + } + r.head = (r.head + 1) % r.size + if r.count < r.size { + r.count++ + } +} + +// RandomEntry returns a random entry from the ring, using CannedRandom to +// avoid potential rand.Rand hotspots under high-concurrency benchmarks. +func (r *txHashRing) RandomEntry(crand *CannedRandom) *txHashEntry { + r.mu.RLock() + defer r.mu.RUnlock() + if r.count == 0 { + return nil + } + idx := int(crand.Int64Range(0, int64(r.count))) + entry := r.entries[idx] + return &entry +} + +const maxRingSampleAttempts = 100 + +// RandomEntryInBlockRange samples a random entry whose blockNumber falls within +// [minBlock, maxBlock]. Returns nil if no matching entry is found after a +// bounded number of attempts. +func (r *txHashRing) RandomEntryInBlockRange(crand *CannedRandom, minBlock, maxBlock uint64) *txHashEntry { + r.mu.RLock() + defer r.mu.RUnlock() + if r.count == 0 { + return nil + } + for range maxRingSampleAttempts { + idx := int(crand.Int64Range(0, int64(r.count))) + entry := r.entries[idx] + if entry.blockNumber >= minBlock && entry.blockNumber <= maxBlock { + return &entry + } + } + return nil +} + +// A simulated receipt store with concurrent reads, writes, and pruning +// backed by the production receipt.ReceiptStore (parquet + ledger cache). type RecieptStoreSimulator struct { ctx context.Context cancel context.CancelFunc @@ -25,17 +110,29 @@ type RecieptStoreSimulator struct { recieptsChan chan *block - store receipt.ReceiptStore - metrics *CryptosimMetrics + store receipt.ReceiptStore + crand *CannedRandom + txRing *txHashRing + metrics *CryptosimMetrics + receiptCacheWindowBlocks uint64 } // Creates a new receipt store simulator backed by the production ReceiptStore -// (parquet backend + ledger cache), matching the real node write path. +// (parquet backend + ledger cache), with optional concurrent reader goroutines. +// +// The caller must supply a CannedRandom instance (typically via Clone) that +// shares the same (seed, bufferSize) as the block builder so that +// SyntheticTxHash reproduces the hashes the write path stored. +// +// Receipt-by-hash reads reconstruct tx hashes on the fly via SyntheticTxHash +// (no storage needed). Log filter reads use the ring buffer to sample contract +// addresses written by the write path. See SyntheticTxHash in receipt.go for details. func NewRecieptStoreSimulator( ctx context.Context, config *CryptoSimConfig, recieptsChan chan *block, metrics *CryptosimMetrics, + crand *CannedRandom, ) (*RecieptStoreSimulator, error) { derivedCtx, cancel := context.WithCancel(ctx) @@ -44,24 +141,40 @@ func NewRecieptStoreSimulator( Backend: "parquet", KeepRecent: int(config.ReceiptKeepRecent), PruneIntervalSeconds: int(config.ReceiptPruneIntervalSeconds), + DisableTxIndexLookup: config.DisableReceiptTxIndexLookup, } // nil StoreKey is safe: the parquet write path never touches the legacy KV store. - store, err := receipt.NewReceiptStore(storeCfg, nil) + // Cryptosim passes its metrics into the cache wrapper so cache hits/misses are + // measured at the only layer that can distinguish them reliably. + store, err := receipt.NewReceiptStoreWithReadMetrics(storeCfg, nil, metrics) if err != nil { cancel() return nil, fmt.Errorf("failed to create receipt store: %w", err) } + txRing := newTxHashRing(defaultTxHashRingSize) + r := &RecieptStoreSimulator{ - ctx: derivedCtx, - cancel: cancel, - config: config, - recieptsChan: recieptsChan, - store: store, - metrics: metrics, + ctx: derivedCtx, + cancel: cancel, + config: config, + recieptsChan: recieptsChan, + store: store, + crand: crand, + txRing: txRing, + metrics: metrics, + receiptCacheWindowBlocks: receipt.StableReceiptCacheWindowBlocks(store), } go r.mainLoop() + + if config.ReceiptReadConcurrency > 0 && config.ReceiptReadsPerSecond > 0 { + r.startReceiptReaders() + } + if config.ReceiptLogFilterReadConcurrency > 0 && config.ReceiptLogFilterReadsPerSecond > 0 { + r.startLogFilterReaders() + } + return r, nil } @@ -82,13 +195,19 @@ func (r *RecieptStoreSimulator) mainLoop() { } // Processes a block of receipts using the production ReceiptStore.SetReceipts path, -// which writes to parquet (WAL + buffer + rotation) and populates the ledger cache. +// then populates the ring buffer with contract addresses for log filter reads. func (r *RecieptStoreSimulator) processBlock(blk *block) { blockNumber := uint64(blk.BlockNumber()) //nolint:gosec records := make([]receipt.ReceiptRecord, 0, len(blk.reciepts)) var marshalErrors int64 + type ringEntry struct { + txHash common.Hash + contractAddress common.Address + } + ringEntries := make([]ringEntry, 0, len(blk.reciepts)) + for _, rcpt := range blk.reciepts { if rcpt == nil { continue @@ -107,6 +226,11 @@ func (r *RecieptStoreSimulator) processBlock(blk *block) { Receipt: rcpt, ReceiptBytes: receiptBytes, }) + + ringEntries = append(ringEntries, ringEntry{ + txHash: txHash, + contractAddress: common.HexToAddress(rcpt.ContractAddress), + }) } for range marshalErrors { @@ -114,8 +238,6 @@ func (r *RecieptStoreSimulator) processBlock(blk *block) { } if len(records) > 0 { - // Build a minimal sdk.Context with the block height set. - // The parquet write path only uses ctx.BlockHeight() and ctx.Context(). sdkCtx := sdk.NewContext(nil, tmproto.Header{Height: int64(blockNumber)}, false) //nolint:gosec start := time.Now() @@ -128,11 +250,203 @@ func (r *RecieptStoreSimulator) processBlock(blk *block) { r.metrics.ReportReceiptsWritten(int64(len(records))) } + for _, entry := range ringEntries { + r.txRing.Push(entry.txHash, blockNumber, entry.contractAddress) + } + if err := r.store.SetLatestVersion(int64(blockNumber)); err != nil { //nolint:gosec fmt.Printf("failed to update latest version for block %d: %v\n", blockNumber, err) } } +// startReceiptReaders launches dedicated goroutines for receipt-by-hash lookups. +func (r *RecieptStoreSimulator) startReceiptReaders() { + readerCount := r.config.ReceiptReadConcurrency + totalReadsPerSec := r.config.ReceiptReadsPerSecond + if totalReadsPerSec <= 0 { + totalReadsPerSec = 1000 + } + + readsPerReader := totalReadsPerSec / readerCount + if readsPerReader < 1 { + readsPerReader = 1 + } + + for i := 0; i < readerCount; i++ { + readerCrand := r.crand.Clone(true) + go r.tickerLoop(readsPerReader, readerCrand, r.executeReceiptRead) + } + + fmt.Printf("Started %d receipt reader goroutines (%d reads/sec each)\n", + readerCount, readsPerReader) +} + +// startLogFilterReaders launches dedicated goroutines for log filter (eth_getLogs) queries. +func (r *RecieptStoreSimulator) startLogFilterReaders() { + readerCount := r.config.ReceiptLogFilterReadConcurrency + totalReadsPerSec := r.config.ReceiptLogFilterReadsPerSecond + if totalReadsPerSec <= 0 { + totalReadsPerSec = 100 + } + + readsPerReader := totalReadsPerSec / readerCount + if readsPerReader < 1 { + readsPerReader = 1 + } + + for i := 0; i < readerCount; i++ { + readerCrand := r.crand.Clone(true) + go r.tickerLoop(readsPerReader, readerCrand, r.executeLogFilterRead) + } + + fmt.Printf("Started %d log filter reader goroutines (%d reads/sec each)\n", + readerCount, readsPerReader) +} + +func (r *RecieptStoreSimulator) tickerLoop(readsPerSecond int, crand *CannedRandom, fn func(*CannedRandom)) { + interval := time.Second / time.Duration(readsPerSecond) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-r.ctx.Done(): + return + case <-ticker.C: + fn(crand) + } + } +} + +// executeReceiptRead samples a tx hash from the ring and queries GetReceipt. +// +// ReceiptReadMode controls which blocks are targeted: +// - "cache": only blocks within the cache window (guaranteed cache hit). +// - "duckdb": only blocks older than the cache window (guaranteed cache miss). +func (r *RecieptStoreSimulator) executeReceiptRead(crand *CannedRandom) { + latestBlock := r.store.LatestVersion() + if latestBlock <= 0 { + return + } + latest := uint64(latestBlock) //nolint:gosec + cacheWindow := r.receiptCacheWindowBlocks + + buffer := cacheWindow / 10 + var entry *txHashEntry + switch r.config.ReceiptReadMode { + case receiptReadModeCache: + safeWindow := cacheWindow - buffer + minBlock := uint64(0) + if latest > safeWindow { + minBlock = latest - safeWindow + } + entry = r.txRing.RandomEntryInBlockRange(crand, minBlock, latest) + case receiptReadModeDuckDB: + maxBlock := uint64(0) + if latest > cacheWindow { + maxBlock = latest - cacheWindow - 1 + } + entry = r.txRing.RandomEntryInBlockRange(crand, 0, maxBlock) + } + if entry == nil { + return + } + + r.metrics.ReportReceiptRead() + + sdkCtx := sdk.NewContext(nil, tmproto.Header{}, false) + start := time.Now() + rcpt, err := r.store.GetReceipt(sdkCtx, entry.txHash) + r.metrics.RecordReceiptReadDuration(time.Since(start).Seconds()) + + if err != nil { + r.metrics.ReportReceiptError() + return + } + if rcpt != nil { + r.metrics.ReportReceiptReadFound() + return + } + r.metrics.ReportReceiptReadNotFound() +} + +// executeLogFilterRead simulates an eth_getLogs query filtering by contract address +// over a configurable block range. Contract addresses come from the ring buffer. +// +// ReceiptLogFilterReadMode controls which blocks are targeted: +// - "cache": block range falls entirely within the cache window (DuckDB skipped). +// - "duckdb": block range falls entirely before the cache window (cache miss). +func (r *RecieptStoreSimulator) executeLogFilterRead(crand *CannedRandom) { + entry := r.txRing.RandomEntry(crand) + if entry == nil { + return + } + + latestVersion := r.store.LatestVersion() + if latestVersion <= 0 { + return + } + + //nolint:gosec // Config validation guarantees a positive block range before this conversion. + rangeSize := uint64(crand.Int64Range( + int64(r.config.ReceiptLogFilterMinBlockRange), + int64(r.config.ReceiptLogFilterMaxBlockRange)+1, + )) + latest := uint64(latestVersion) //nolint:gosec + cacheWindow := r.receiptCacheWindowBlocks + + var fromBlock, toBlock uint64 + + buffer := cacheWindow / 10 + switch r.config.ReceiptLogFilterReadMode { + case receiptReadModeCache: + safeWindow := cacheWindow - buffer + cacheMin := uint64(0) + if latest > safeWindow { + cacheMin = latest - safeWindow + } + if latest <= cacheMin { + return + } + fromBlock = uint64(crand.Int64Range(int64(cacheMin), int64(latest)+1)) //nolint:gosec + toBlock = fromBlock + rangeSize + if toBlock > latest { + toBlock = latest + } + case receiptReadModeDuckDB: + if latest <= cacheWindow { + return + } + coldMax := latest - cacheWindow - 1 + earliestBlock := uint64(1) + if r.config.ReceiptKeepRecent > 0 && latest > uint64(r.config.ReceiptKeepRecent) { //nolint:gosec + earliestBlock = latest - uint64(r.config.ReceiptKeepRecent) + 1 //nolint:gosec + } + if coldMax < earliestBlock { + return + } + fromBlock = uint64(crand.Int64Range(int64(earliestBlock), int64(coldMax)+1)) //nolint:gosec + toBlock = fromBlock + rangeSize + if toBlock > coldMax { + toBlock = coldMax + } + } + + crit := filters.FilterCriteria{ + Addresses: []common.Address{entry.contractAddress}, + } + + sdkCtx := sdk.NewContext(nil, tmproto.Header{}, false) + start := time.Now() + logs, err := r.store.FilterLogs(sdkCtx, fromBlock, toBlock, crit) + r.metrics.RecordReceiptLogFilterDuration(time.Since(start).Seconds()) + r.metrics.RecordLogFilterLogsReturned(int64(len(logs))) + + if err != nil { + r.metrics.ReportReceiptError() + } +} + // convertLogsForTx converts evmtypes.Log entries to ethtypes.Log entries. // Mirrors receipt.getLogsForTx. func convertLogsForTx(rcpt *evmtypes.Receipt, logStartIndex uint) []*ethtypes.Log { diff --git a/sei-db/state_db/bench/cryptosim/reciept_store_simulator_test.go b/sei-db/state_db/bench/cryptosim/reciept_store_simulator_test.go new file mode 100644 index 0000000000..3695c6a810 --- /dev/null +++ b/sei-db/state_db/bench/cryptosim/reciept_store_simulator_test.go @@ -0,0 +1,63 @@ +package cryptosim + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +func TestRandomEntryInBlockRange(t *testing.T) { + ring := newTxHashRing(100) + crand := NewCannedRandom(42, 1024*1024) + + for i := uint64(1); i <= 50; i++ { + ring.Push(common.BigToHash(common.Big0), i, common.Address{}) + } + + tests := []struct { + name string + minBlock uint64 + maxBlock uint64 + wantNil bool + }{ + { + name: "range covers all entries", + minBlock: 1, + maxBlock: 50, + wantNil: false, + }, + { + name: "range covers recent entries only", + minBlock: 40, + maxBlock: 50, + wantNil: false, + }, + { + name: "range covers old entries only", + minBlock: 1, + maxBlock: 10, + wantNil: false, + }, + { + name: "range outside all entries", + minBlock: 100, + maxBlock: 200, + wantNil: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + entry := ring.RandomEntryInBlockRange(crand, tt.minBlock, tt.maxBlock) + if tt.wantNil && entry != nil { + t.Fatalf("expected nil, got block %d", entry.blockNumber) + } + if !tt.wantNil && entry == nil { + t.Fatalf("expected entry in [%d,%d], got nil", tt.minBlock, tt.maxBlock) + } + if entry != nil && (entry.blockNumber < tt.minBlock || entry.blockNumber > tt.maxBlock) { + t.Fatalf("expected block in [%d,%d], got %d", tt.minBlock, tt.maxBlock, entry.blockNumber) + } + }) + } +}