Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
07b0cae
hit ledger cache first then duckdb
jewei1997 Mar 30, 2026
8820251
parquet: add pebble-backed tx_hash -> block_number index
jewei1997 Mar 31, 2026
f1ac18c
add receipt/log reads to cryptosim
jewei1997 Mar 30, 2026
43c8c09
switch cryptosim receipt reads to cache mode, fix log filter cache hi…
jewei1997 Mar 30, 2026
fc84e9d
add cache scan duration and size metrics to diagnose read throughput …
jewei1997 Mar 30, 2026
cb5a881
use StableReceiptCacheWindowBlocks for cache-mode reads
jewei1997 Mar 30, 2026
02b9ce8
add 10% safety buffer to cache-mode reads to avoid rotation boundary …
jewei1997 Mar 30, 2026
f1cbf27
allow WAL to be truncated
jewei1997 Mar 31, 2026
a8fa282
add unit tests for wal truncation
jewei1997 Mar 31, 2026
ed48ba5
switch cryptosim receipt reads to duckdb-only mode
jewei1997 Mar 31, 2026
a5bcd4f
parquet: prune files below FromBlock in GetLogs + add pebble tx_hash …
jewei1997 Mar 31, 2026
ca92630
parquet: add tests for GetLogs FromBlock pruning and tx_hash index
jewei1997 Mar 31, 2026
0db9a8e
add TODO
jewei1997 Mar 31, 2026
90505eb
Merge branch 'main' into cryptosim-receipt-gen-reads
jewei1997 Apr 7, 2026
f0e7d68
Merge branch 'main' into cryptosim-receipt-gen-reads
jewei1997 Apr 7, 2026
e081f21
option to disable tx index
jewei1997 Apr 8, 2026
af2cff2
adjust receipt store config
jewei1997 Apr 8, 2026
8737bc3
tune receipt read config
jewei1997 Apr 8, 2026
2501e94
lower keep recent
jewei1997 Apr 8, 2026
d68970d
update configs, prune interval seconds lowered
jewei1997 Apr 8, 2026
36ea37b
only logs now
jewei1997 Apr 8, 2026
df22799
keep recent back to 100k for log testing
jewei1997 Apr 8, 2026
cc74cbe
add buckets for latency and read from cache now
jewei1997 Apr 8, 2026
19d034f
increase log ceiling even more to 100k
jewei1997 Apr 8, 2026
e5e1b75
increase log ceiling even more to 500k
jewei1997 Apr 8, 2026
7912ba9
only do receipts in cache
jewei1997 Apr 8, 2026
c6d1b1e
with pebble tx index, maxed out receipts to duckdb
jewei1997 Apr 8, 2026
8ad586b
remove tx index for now
jewei1997 Apr 8, 2026
6275698
personal review fixes
jewei1997 Apr 8, 2026
fbccea8
Merge branch 'main' into cryptosim-receipt-gen-reads
jewei1997 Apr 8, 2026
7655181
fix lint
jewei1997 Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
888 changes: 888 additions & 0 deletions docker/monitornode/dashboards/cryptosim-dashboard.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions sei-db/config/receipt_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type ReceiptStoreConfig struct {
// PruneIntervalSeconds defines the interval in seconds to trigger pruning
// default to every 600 seconds
PruneIntervalSeconds int `mapstructure:"prune-interval-seconds"`

// DisableTxIndexLookup must remain true. The tx_hash -> block_number lookup
// implementation is intentionally unsupported; setting this to false will
// panic during parquet store initialization.
DisableTxIndexLookup bool `mapstructure:"disable-tx-index-lookup"`
}

// DefaultReceiptStoreConfig returns the default ReceiptStoreConfig
Expand All @@ -57,6 +62,7 @@ func DefaultReceiptStoreConfig() ReceiptStoreConfig {
AsyncWriteBuffer: DefaultSSAsyncBuffer,
KeepRecent: DefaultSSKeepRecent,
PruneIntervalSeconds: DefaultSSPruneInterval,
DisableTxIndexLookup: true,
}
}

Expand Down
3 changes: 3 additions & 0 deletions sei-db/ledger_db/parquet/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ func (r *Reader) GetLogs(ctx context.Context, filter LogFilter) ([]LogResult, er
if filter.ToBlock != nil && startBlock > *filter.ToBlock {
continue
}
if filter.FromBlock != nil && startBlock+r.maxBlocksPerFile <= *filter.FromBlock {
continue
}
files = append(files, f)
}
if len(files) == 0 {
Expand Down
119 changes: 119 additions & 0 deletions sei-db/ledger_db/parquet/reader_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package parquet

import (
"context"
"fmt"
"math/big"
"os"
"testing"

"github.com/ethereum/go-ethereum/common"
pqgo "github.com/parquet-go/parquet-go"
"github.com/stretchr/testify/require"
)

func createTestLogFile(dir string, startBlock, count uint64) error {
path := fmt.Sprintf("%s/logs_%d.parquet", dir, startBlock)
f, err := os.Create(path)
if err != nil {
return err
}
w := pqgo.NewGenericWriter[LogRecord](f)
for i := uint64(0); i < count; i++ {
block := startBlock + i
txHash := common.BigToHash(new(big.Int).SetUint64(block))
if _, err := w.Write([]LogRecord{{
BlockNumber: block,
TxHash: txHash[:],
Address: common.HexToAddress("0xdead").Bytes(),
}}); err != nil {
return err
}
}
if err := w.Close(); err != nil {
return err
}
return f.Close()
}

func uint64Ptr(v uint64) *uint64 { return &v }

func TestGetLogsPrunesFilesBelowFromBlock(t *testing.T) {
dir := t.TempDir()

for _, start := range []uint64{0, 500, 1000, 1500} {
require.NoError(t, createTestReceiptFile(dir, start, 500))
require.NoError(t, createTestLogFile(dir, start, 500))
}

reader, err := NewReaderWithMaxBlocksPerFile(dir, 500)
require.NoError(t, err)
defer func() { _ = reader.Close() }()

ctx := context.Background()

results, err := reader.GetLogs(ctx, LogFilter{
FromBlock: uint64Ptr(1200),
ToBlock: uint64Ptr(1300),
})
require.NoError(t, err)

for _, r := range results {
require.GreaterOrEqual(t, r.BlockNumber, uint64(1200))
require.LessOrEqual(t, r.BlockNumber, uint64(1300))
}
require.Equal(t, 101, len(results), "should have blocks 1200-1300 inclusive")
}

func TestGetLogsPrunesBothEnds(t *testing.T) {
dir := t.TempDir()

for _, start := range []uint64{0, 500, 1000, 1500, 2000} {
require.NoError(t, createTestReceiptFile(dir, start, 500))
require.NoError(t, createTestLogFile(dir, start, 500))
}

reader, err := NewReaderWithMaxBlocksPerFile(dir, 500)
require.NoError(t, err)
defer func() { _ = reader.Close() }()

ctx := context.Background()

// Query blocks 1400-1600: should need overlapping files 1000 and 1500,
// but still prune non-overlapping files 0, 500, and 2000.
results, err := reader.GetLogs(ctx, LogFilter{
FromBlock: uint64Ptr(1400),
ToBlock: uint64Ptr(1600),
})
require.NoError(t, err)

for _, r := range results {
require.GreaterOrEqual(t, r.BlockNumber, uint64(1400))
require.LessOrEqual(t, r.BlockNumber, uint64(1600))
}
require.Equal(t, 201, len(results), "should have blocks 1400-1600 inclusive")
}

func TestStoreGetReceiptByTxHashWithoutIndex(t *testing.T) {
dir := t.TempDir()

for _, start := range []uint64{0, 500, 1000} {
require.NoError(t, createTestReceiptFile(dir, start, 500))
}

store, err := NewStore(StoreConfig{
DBDirectory: dir,
MaxBlocksPerFile: 500,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
defer func() { _ = store.Close() }()

txHash := common.BigToHash(new(big.Int).SetUint64(750))

ctx := context.Background()
result, err := store.GetReceiptByTxHash(ctx, txHash)
require.NoError(t, err)
require.NotNil(t, result)
require.Equal(t, uint64(750), result.BlockNumber)
}
19 changes: 11 additions & 8 deletions sei-db/ledger_db/parquet/reader_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func TestConcurrentReadsAndPrune(t *testing.T) {
}

store, err := NewStore(StoreConfig{
DBDirectory: dir,
MaxBlocksPerFile: 500,
KeepRecent: 600,
DBDirectory: dir,
MaxBlocksPerFile: 500,
KeepRecent: 600,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })
Expand Down Expand Up @@ -134,8 +135,9 @@ func TestOnFileRotationNotBlockedByPruneMu(t *testing.T) {
require.NoError(t, createTestReceiptFile(dir, 0, 1))

store, err := NewStore(StoreConfig{
DBDirectory: dir,
MaxBlocksPerFile: 500,
DBDirectory: dir,
MaxBlocksPerFile: 500,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })
Expand Down Expand Up @@ -168,9 +170,10 @@ func TestConcurrentReadsPruneAndRotation(t *testing.T) {
}

store, err := NewStore(StoreConfig{
DBDirectory: dir,
MaxBlocksPerFile: 500,
KeepRecent: 1000,
DBDirectory: dir,
MaxBlocksPerFile: 500,
KeepRecent: 1000,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })
Expand Down
14 changes: 11 additions & 3 deletions sei-db/ledger_db/parquet/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ type StoreConfig struct {
PruneIntervalSeconds int64
BlockFlushInterval uint64
MaxBlocksPerFile uint64
DisableTxIndexLookup bool
}

// DefaultStoreConfig returns the default store configuration.
func DefaultStoreConfig() StoreConfig {
return StoreConfig{
BlockFlushInterval: defaultBlockFlushInterval,
MaxBlocksPerFile: defaultMaxBlocksPerFile,
BlockFlushInterval: defaultBlockFlushInterval,
MaxBlocksPerFile: defaultMaxBlocksPerFile,
DisableTxIndexLookup: true,
}
}

Expand Down Expand Up @@ -102,6 +104,9 @@ type Store struct {
// NewStore creates a new parquet store.
func NewStore(cfg StoreConfig) (*Store, error) {
storeCfg := resolveStoreConfig(cfg)
if !storeCfg.DisableTxIndexLookup {
panic("not implemented")
}

if err := os.MkdirAll(cfg.DBDirectory, 0o750); err != nil {
return nil, fmt.Errorf("failed to create parquet base directory: %w", err)
Expand Down Expand Up @@ -151,6 +156,7 @@ func resolveStoreConfig(cfg StoreConfig) StoreConfig {
resolved.DBDirectory = cfg.DBDirectory
resolved.KeepRecent = cfg.KeepRecent
resolved.PruneIntervalSeconds = cfg.PruneIntervalSeconds
resolved.DisableTxIndexLookup = cfg.DisableTxIndexLookup
if cfg.BlockFlushInterval > 0 {
resolved.BlockFlushInterval = cfg.BlockFlushInterval
}
Expand Down Expand Up @@ -188,7 +194,8 @@ func (s *Store) SetBlockFlushInterval(interval uint64) {
s.config.BlockFlushInterval = interval
}

// GetReceiptByTxHash retrieves a receipt by transaction hash.
// GetReceiptByTxHash retrieves a receipt by transaction hash via a full scan of
// the closed parquet files tracked by the reader.
func (s *Store) GetReceiptByTxHash(ctx context.Context, txHash common.Hash) (*ReceiptResult, error) {
return s.Reader.GetReceiptByTxHash(ctx, txHash)
}
Expand Down Expand Up @@ -320,6 +327,7 @@ func (s *Store) Close() error {
}
if closeErr := s.Reader.Close(); closeErr != nil {
err = closeErr
return
}
})

Expand Down
34 changes: 26 additions & 8 deletions sei-db/ledger_db/parquet/store_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func (m *mockParquetWAL) Close() error { return nil }

func TestNewStoreAppliesConfiguredIntervals(t *testing.T) {
store, err := NewStore(StoreConfig{
DBDirectory: t.TempDir(),
BlockFlushInterval: 7,
MaxBlocksPerFile: 11,
DBDirectory: t.TempDir(),
BlockFlushInterval: 7,
MaxBlocksPerFile: 11,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })
Expand All @@ -54,32 +55,46 @@ func TestNewStoreAppliesConfiguredIntervals(t *testing.T) {

func TestNewStoreUsesDefaultIntervalsWhenUnset(t *testing.T) {
store, err := NewStore(StoreConfig{
DBDirectory: t.TempDir(),
DBDirectory: t.TempDir(),
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })

require.Equal(t, defaultBlockFlushInterval, store.config.BlockFlushInterval)
require.Equal(t, defaultMaxBlocksPerFile, store.config.MaxBlocksPerFile)
require.Equal(t, defaultMaxBlocksPerFile, store.CacheRotateInterval())
require.True(t, store.config.DisableTxIndexLookup)
}

func TestNewStorePreservesKeepRecentAndPruneIntervalSettings(t *testing.T) {
store, err := NewStore(StoreConfig{
DBDirectory: t.TempDir(),
KeepRecent: 123,
PruneIntervalSeconds: 9,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })

require.Equal(t, int64(123), store.config.KeepRecent)
require.Equal(t, int64(9), store.config.PruneIntervalSeconds)
require.True(t, store.config.DisableTxIndexLookup)
}

func TestNewStorePanicsWhenTxIndexLookupEnabled(t *testing.T) {
require.PanicsWithValue(t, "not implemented", func() {
_, _ = NewStore(StoreConfig{
DBDirectory: t.TempDir(),
DisableTxIndexLookup: false,
})
})
}

func TestPruneOldFilesKeepsTrackingOnDeleteFailure(t *testing.T) {
store, err := NewStore(StoreConfig{
DBDirectory: t.TempDir(),
DBDirectory: t.TempDir(),
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })
Expand Down Expand Up @@ -133,7 +148,8 @@ func TestCorruptLastFileDeletedOnStartup(t *testing.T) {
require.NoError(t, os.WriteFile(corruptLog, []byte("not a parquet file"), 0o644))

store, err := NewStore(StoreConfig{
DBDirectory: dir,
DBDirectory: dir,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })
Expand Down Expand Up @@ -165,7 +181,8 @@ func TestCorruptLogFileUntracksReceiptCounterpart(t *testing.T) {
require.NoError(t, os.WriteFile(corruptLog, []byte("not a parquet file"), 0o644))

store, err := NewStore(StoreConfig{
DBDirectory: dir,
DBDirectory: dir,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })
Expand All @@ -184,7 +201,8 @@ func TestLazyInitCreatesFileOnFirstWrite(t *testing.T) {
dir := t.TempDir()

store, err := NewStore(StoreConfig{
DBDirectory: dir,
DBDirectory: dir,
DisableTxIndexLookup: true,
})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })
Expand Down
5 changes: 4 additions & 1 deletion sei-db/ledger_db/parquet/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func NewWAL(dir string) (dbwal.GenericWAL[WALEntry], error) {
encodeWALEntry,
decodeWALEntry,
dir,
dbwal.Config{},
dbwal.Config{
// Allow the WAL to be fully emptied after rotation/truncation.
AllowEmpty: true,
},
)
}
Loading
Loading