From 9c793496039bd4ab922cd2c3d4499c79363bf8c0 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Thu, 4 Sep 2025 14:53:25 -0400 Subject: [PATCH 01/16] Bump grocksdb --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index fddc5536..2de99b47 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/cosmos/iavl v0.21.0-alpha.1.0.20230904092046-df3db2d96583 github.com/gogo/protobuf v1.3.3 github.com/ledgerwatch/erigon-lib v0.0.0-20230210071639-db0e7ed11263 - github.com/linxGnu/grocksdb v1.8.4 + github.com/linxGnu/grocksdb v1.8.11 github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.4 github.com/tendermint/tm-db v0.6.8-0.20220519162814-e24b96538a12 diff --git a/go.sum b/go.sum index 6b4e48b5..6b11eeac 100644 --- a/go.sum +++ b/go.sum @@ -702,8 +702,8 @@ github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= -github.com/linxGnu/grocksdb v1.8.4 h1:ZMsBpPpJNtRLHiKKp0mI7gW+NT4s7UgfD5xHxx1jVRo= -github.com/linxGnu/grocksdb v1.8.4/go.mod h1:xZCIb5Muw+nhbDK4Y5UJuOrin5MceOuiXkVUR7vp4WY= +github.com/linxGnu/grocksdb v1.8.11 h1:BGol9e5gB1BrsTvOxloC88pe70TCqgrfLNwkyWW0kD8= +github.com/linxGnu/grocksdb v1.8.11/go.mod h1:xZCIb5Muw+nhbDK4Y5UJuOrin5MceOuiXkVUR7vp4WY= github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= From 8cd14f55bfe0777c0aa0416c5f1ce8f92330bf0b Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Thu, 4 Sep 2025 16:20:14 -0400 Subject: [PATCH 02/16] Add interface for rocksdb --- ss/rocksdb/db.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index c047c6aa..c2f7991e 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -328,6 +328,10 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte return false, nil } +func (db *Database) DeleteKeysAtVersion(module string, version int64) error { + panic("not implemented") +} + // newTSReadOptions returns ReadOptions used in the RocksDB column family read. func newTSReadOptions(version int64) *grocksdb.ReadOptions { var ts [TimestampSize]byte From fd83ea76a4d626f6a971f10c2b17ee228dd20e0a Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Thu, 4 Sep 2025 16:22:53 -0400 Subject: [PATCH 03/16] Add interface fns --- ss/rocksdb/db.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index c2f7991e..35d5333e 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -332,6 +332,22 @@ func (db *Database) DeleteKeysAtVersion(module string, version int64) error { panic("not implemented") } +func (db *Database) GetLatestMigratedKey() ([]byte, error) { + panic("not implemented") +} + +func (db *Database) SetLatestMigratedKey(key []byte) error { + panic("not implemented") +} + +func (db *Database) GetLatestMigratedModule() (string, error) { + panic("not implemented") +} + +func (db *Database) SetLatestMigratedModule(module string) error { + panic("not implemented") +} + // newTSReadOptions returns ReadOptions used in the RocksDB column family read. func newTSReadOptions(version int64) *grocksdb.ReadOptions { var ts [TimestampSize]byte From 1a03f398585232fec62946e7ee5cab86cd20b4ff Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Fri, 5 Sep 2025 12:50:10 -0400 Subject: [PATCH 04/16] Earliest version rocksdb --- ss/rocksdb/db.go | 63 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 35d5333e..16155b58 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -22,8 +22,9 @@ import ( const ( TimestampSize = 8 - StorePrefixTpl = "s/k:%s/" - latestVersionKey = "s/latest" + StorePrefixTpl = "s/k:%s/" + latestVersionKey = "s/latest" + earliestVersionKey = "s/earliest" // TODO: Make configurable ImportCommitBatchSize = 10000 @@ -45,6 +46,9 @@ type Database struct { // a lazy manner, we use this value to prevent reads for versions that will // be purged in the next compaction. tsLow int64 + + // Earliest version for db after pruning + earliestVersion int64 } func New(dataDir string, config config.StateStoreConfig) (*Database, error) { @@ -64,11 +68,17 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { tsLow = int64(binary.LittleEndian.Uint64(tsLowBz)) } + earliestVersion, err := retrieveEarliestVersion(storage) + if err != nil { + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + return &Database{ - storage: storage, - config: config, - cfHandle: cfHandle, - tsLow: tsLow, + storage: storage, + config: config, + cfHandle: cfHandle, + tsLow: tsLow, + earliestVersion: earliestVersion, }, nil } @@ -83,10 +93,17 @@ func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Da if len(tsLowBz) > 0 { tsLow = int64(binary.LittleEndian.Uint64(tsLowBz)) } + + earliestVersion, err := retrieveEarliestVersion(storage) + if err != nil { + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + return &Database{ - storage: storage, - cfHandle: cfHandle, - tsLow: tsLow, + storage: storage, + cfHandle: cfHandle, + tsLow: tsLow, + earliestVersion: earliestVersion, }, nil } @@ -129,11 +146,18 @@ func (db *Database) GetLatestVersion() (int64, error) { } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { - panic("not implemented") + if version > db.earliestVersion || ignoreVersion { + db.earliestVersion = version + + var ts [TimestampSize]byte + binary.LittleEndian.PutUint64(ts[:], uint64(version)) + return db.storage.Put(defaultWriteOpts, []byte(earliestVersionKey), ts[:]) + } + return nil } func (db *Database) GetEarliestVersion() (int64, error) { - panic("not implemented") + return db.earliestVersion, nil } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { @@ -403,3 +427,20 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { panic("implement me") } + +// retrieveEarliestVersion retrieves the earliest version from the database +func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { + bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey)) + if err != nil { + // Assuming RocksDB returns an error for key not found similar to how PebbleDB uses pebble.ErrNotFound + // We'll treat any error as "not found" for now, but this might need adjustment based on grocksdb behavior + return 0, nil + } + + if len(bz) == 0 { + // in case of a fresh database + return 0, nil + } + + return int64(binary.LittleEndian.Uint64(bz)), nil +} From 89b52ae7211de91d203dfba786ce7bf6a22270f9 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Mon, 29 Sep 2025 11:58:39 -0400 Subject: [PATCH 05/16] Remove interfaces --- ss/rocksdb/db.go | 12 ------------ ss/types/store.go | 3 --- 2 files changed, 15 deletions(-) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 16155b58..aed2e353 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -352,26 +352,14 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte return false, nil } -func (db *Database) DeleteKeysAtVersion(module string, version int64) error { - panic("not implemented") -} - func (db *Database) GetLatestMigratedKey() ([]byte, error) { panic("not implemented") } -func (db *Database) SetLatestMigratedKey(key []byte) error { - panic("not implemented") -} - func (db *Database) GetLatestMigratedModule() (string, error) { panic("not implemented") } -func (db *Database) SetLatestMigratedModule(module string) error { - panic("not implemented") -} - // newTSReadOptions returns ReadOptions used in the RocksDB column family read. func newTSReadOptions(version int64) *grocksdb.ReadOptions { var ts [TimestampSize]byte diff --git a/ss/types/store.go b/ss/types/store.go index 56844468..5fa574c5 100644 --- a/ss/types/store.go +++ b/ss/types/store.go @@ -19,11 +19,8 @@ type StateStore interface { GetEarliestVersion() (int64, error) SetEarliestVersion(version int64, ignoreVersion bool) error GetLatestMigratedKey() ([]byte, error) - SetLatestMigratedKey(key []byte) error GetLatestMigratedModule() (string, error) - SetLatestMigratedModule(module string) error WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error - DeleteKeysAtVersion(module string, version int64) error // ApplyChangeset Persist the change set of a block, // the `changeSet` should be ordered by (storeKey, key), From fcd3229907e454a9ed00dddfb9284bf13327d3c6 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Mon, 29 Sep 2025 12:37:56 -0400 Subject: [PATCH 06/16] Add log warning --- ss/rocksdb/db.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index aed2e353..2783d51c 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -420,8 +420,7 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey)) if err != nil { - // Assuming RocksDB returns an error for key not found similar to how PebbleDB uses pebble.ErrNotFound - // We'll treat any error as "not found" for now, but this might need adjustment based on grocksdb behavior + fmt.Printf("warning: rocksdb get for earliestVersionKey failed: %v", err) return 0, nil } From 586de1745ba371f584dc49eb8fb2d8d21c3d52e6 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 10:23:55 -0400 Subject: [PATCH 07/16] Update rocksdb CI --- .github/workflows/unit_tests.yml | 51 +++++++++++++++++++++++++++++++- Makefile | 3 ++ ss/rocksdb/db_test.go | 5 ++-- 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index dcd0431b..ee1d8ab7 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -25,8 +25,57 @@ jobs: name: "${{ github.sha }}-coverage" path: ./profile.out + tests-rocksdb: + runs-on: ubuntu-latest + steps: + - uses: actions/setup-go@v3 + with: + go-version: '1.19' + - uses: actions/checkout@v3 + + - name: Install RocksDB dependencies + run: | + sudo apt-get update + sudo apt-get install -y build-essential pkg-config cmake git zlib1g-dev libbz2-dev libsnappy-dev liblz4-dev libzstd-dev libjemalloc-dev libgflags-dev liburing-dev + + - name: Cache RocksDB + id: cache-rocksdb + uses: actions/cache@v3 + with: + path: | + /usr/local/lib/librocksdb.* + /usr/local/include/rocksdb + key: rocksdb-v8.9.1-${{ runner.os }} + + - name: Build and Install RocksDB from source + if: steps.cache-rocksdb.outputs.cache-hit != 'true' + run: | + git clone https://github.com/facebook/rocksdb.git + cd rocksdb + git checkout v8.9.1 + make clean + CXXFLAGS='-march=native -DNDEBUG' make -j"$(nproc)" shared_lib + sudo make install-shared + cd .. + + - name: Configure RocksDB library path + run: | + echo '/usr/local/lib' | sudo tee /etc/ld.so.conf.d/rocksdb.conf + sudo ldconfig + + - name: Run RocksDB Tests + run: | + export CGO_CFLAGS="-I/usr/local/include" + export CGO_LDFLAGS="-L/usr/local/lib -lrocksdb -lz -lbz2 -lsnappy -llz4 -lzstd -ljemalloc" + make test-rocksdb + + - uses: actions/upload-artifact@v4 + with: + name: "${{ github.sha }}-coverage-rocksdb" + path: ./profile.out + upload-coverage-report: - needs: tests + needs: [tests, tests-rocksdb] runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/Makefile b/Makefile index fc7e4302..4f20435c 100644 --- a/Makefile +++ b/Makefile @@ -59,5 +59,8 @@ BUILD_FLAGS := -tags "$(build_tags)" -ldflags '$(ldflags)' test-all: go test -v -mod=readonly ./... -covermode=atomic -coverprofile=./profile.out +test-rocksdb: + CGO_CFLAGS="-I/usr/local/include" CGO_LDFLAGS="-L/usr/local/lib -lrocksdb -lz -lbz2 -lsnappy -llz4 -lzstd -ljemalloc" go test -v -mod=readonly -tags=rocksdbBackend ./... -covermode=atomic -coverprofile=./profile.out + lint-all: golangci-lint run --config=.golangci.yml diff --git a/ss/rocksdb/db_test.go b/ss/rocksdb/db_test.go index afebe71a..8e5ee216 100644 --- a/ss/rocksdb/db_test.go +++ b/ss/rocksdb/db_test.go @@ -14,9 +14,10 @@ import ( func TestStorageTestSuite(t *testing.T) { s := &sstest.StorageTestSuite{ - NewDB: func(dir string) (types.StateStore, error) { - return New(dir, config.DefaultStateStoreConfig()) + NewDB: func(dir string, config config.StateStoreConfig) (types.StateStore, error) { + return New(dir, config) }, + Config: config.DefaultStateStoreConfig(), EmptyBatchSize: 12, } From a0587fd1f8c06214db62f99fa153bd462e1a4c1b Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 13:09:55 -0400 Subject: [PATCH 08/16] Only raw import test for pebble --- ss/test/storage_test_suite.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index bda45da5..9c0313a1 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -7,6 +7,7 @@ import ( "github.com/cosmos/iavl" errorutils "github.com/sei-protocol/sei-db/common/errors" "github.com/sei-protocol/sei-db/config" + "github.com/sei-protocol/sei-db/ss" "github.com/sei-protocol/sei-db/ss/types" "github.com/stretchr/testify/suite" "golang.org/x/exp/slices" @@ -1224,6 +1225,12 @@ func (s *StorageTestSuite) TestDatabaseImport() { } func (s *StorageTestSuite) TestDatabaseRawImport() { + // RawImport is only useful for PebbleDB backend + // NOTE: Will be removed from interface soon + if ss.BackendType(s.Config.Backend) != ss.PebbleDBBackend { + s.T().Skip("RawImport test only runs for pebbledb backend") + } + db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) defer db.Close() From ae8b65d1f68dd12e577db341dfc3664ca997c080 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 14:46:23 -0400 Subject: [PATCH 09/16] Apply changeset async --- ss/rocksdb/db.go | 102 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 4 deletions(-) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 2783d51c..7ca9c40e 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -9,13 +9,17 @@ import ( "errors" "fmt" "sync" + "time" "github.com/linxGnu/grocksdb" errorutils "github.com/sei-protocol/sei-db/common/errors" + "github.com/sei-protocol/sei-db/common/logger" + "github.com/sei-protocol/sei-db/common/utils" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/proto" "github.com/sei-protocol/sei-db/ss/types" "github.com/sei-protocol/sei-db/ss/util" + "github.com/sei-protocol/sei-db/stream/changelog" "golang.org/x/exp/slices" ) @@ -37,6 +41,11 @@ var ( defaultReadOpts = grocksdb.NewDefaultReadOptions() ) +type VersionedChangesets struct { + Version int64 + Changesets []*proto.NamedChangeSet +} + type Database struct { storage *grocksdb.DB config config.StateStoreConfig @@ -49,6 +58,14 @@ type Database struct { // Earliest version for db after pruning earliestVersion int64 + + asyncWriteWG sync.WaitGroup + + // Changelog used to support async write + streamHandler *changelog.Stream + + // Pending changes to be written to the DB + pendingChanges chan VersionedChangesets } func New(dataDir string, config config.StateStoreConfig) (*Database, error) { @@ -73,13 +90,31 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) } - return &Database{ + database := &Database{ storage: storage, config: config, cfHandle: cfHandle, tsLow: tsLow, earliestVersion: earliestVersion, - }, nil + pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), + } + + if config.DedicatedChangelog { + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: 300 * time.Second, + }, + ) + database.streamHandler = streamHandler + go database.writeAsyncInBackground() + } + + return database, nil } func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Database, error) { @@ -108,6 +143,17 @@ func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Da } func (db *Database) Close() error { + if db.streamHandler != nil { + // Close the changelog stream first + db.streamHandler.Close() + // Close the pending changes channel to signal the background goroutine to stop + close(db.pendingChanges) + // Wait for the async writes to finish processing all buffered items + db.asyncWriteWG.Wait() + // Only set to nil after background goroutine has finished + db.streamHandler = nil + } + db.storage.Close() db.storage = nil @@ -187,6 +233,14 @@ func (db *Database) Get(storeKey string, version int64, key []byte) ([]byte, err } func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) error { + // Check if version is 0 and change it to 1 + // We do this specifically since keys written as part of genesis state come in as version 0 + // But pebbledb treats version 0 as special, so apply the changeset at version 1 instead + // Port this over to rocksdb for consistency + if version == 0 { + version = 1 + } + b := NewBatch(db, version) for _, kvPair := range cs.Changeset.Pairs { @@ -205,7 +259,45 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { - return fmt.Errorf("not implemented") + // Write to WAL first + if db.streamHandler != nil { + entry := proto.ChangelogEntry{ + Version: version, + } + entry.Changesets = changesets + entry.Upgrades = nil + err := db.streamHandler.WriteNextEntry(entry) + if err != nil { + return err + } + } + // Then write to pending changes + db.pendingChanges <- VersionedChangesets{ + Version: version, + Changesets: changesets, + } + + return nil +} + +func (db *Database) writeAsyncInBackground() { + db.asyncWriteWG.Add(1) + defer db.asyncWriteWG.Done() + for nextChange := range db.pendingChanges { + if db.streamHandler != nil { + version := nextChange.Version + for _, cs := range nextChange.Changesets { + err := db.ApplyChangeset(version, cs) + if err != nil { + panic(err) + } + } + err := db.SetLatestVersion(version) + if err != nil { + panic(err) + } + } + } } // Prune attempts to prune all versions up to and including the provided version. @@ -227,7 +319,9 @@ func (db *Database) Prune(version int64) error { } db.tsLow = tsLow - return nil + + // Update earliestVersion to match (for API consistency with PebbleDB) + return db.SetEarliestVersion(tsLow, false) } func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { From 5364bd99b23e07b7657dc5a7d43ee838a9a3fca5 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 14:54:36 -0400 Subject: [PATCH 10/16] Fix lint --- ss/test/storage_test_suite.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index 9c0313a1..92309c9c 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -7,7 +7,6 @@ import ( "github.com/cosmos/iavl" errorutils "github.com/sei-protocol/sei-db/common/errors" "github.com/sei-protocol/sei-db/config" - "github.com/sei-protocol/sei-db/ss" "github.com/sei-protocol/sei-db/ss/types" "github.com/stretchr/testify/suite" "golang.org/x/exp/slices" @@ -1227,7 +1226,7 @@ func (s *StorageTestSuite) TestDatabaseImport() { func (s *StorageTestSuite) TestDatabaseRawImport() { // RawImport is only useful for PebbleDB backend // NOTE: Will be removed from interface soon - if ss.BackendType(s.Config.Backend) != ss.PebbleDBBackend { + if s.Config.Backend != "pebbledb" { s.T().Skip("RawImport test only runs for pebbledb backend") } From a35d01c39a57f8ca2b289039db02a5800d63c4eb Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 16:22:13 -0400 Subject: [PATCH 11/16] Update earliestVersion --- ss/pebbledb/db_test.go | 4 +++- ss/rocksdb/db.go | 9 ++++----- ss/rocksdb/db_test.go | 4 +++- ss/rocksdb/iterator.go | 17 ++++++++++++++++- 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/ss/pebbledb/db_test.go b/ss/pebbledb/db_test.go index 07b43466..075b80d3 100644 --- a/ss/pebbledb/db_test.go +++ b/ss/pebbledb/db_test.go @@ -10,11 +10,13 @@ import ( ) func TestStorageTestSuite(t *testing.T) { + pebbleConfig := config.DefaultStateStoreConfig() + pebbleConfig.Backend = "pebbledb" s := &sstest.StorageTestSuite{ NewDB: func(dir string, config config.StateStoreConfig) (types.StateStore, error) { return New(dir, config) }, - Config: config.DefaultStateStoreConfig(), + Config: pebbleConfig, EmptyBatchSize: 12, } diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 7ca9c40e..1e9f1b0d 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -207,7 +207,7 @@ func (db *Database) GetEarliestVersion() (int64, error) { } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { - if version < db.tsLow { + if version < db.earliestVersion { return false, nil } @@ -220,7 +220,7 @@ func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error } func (db *Database) Get(storeKey string, version int64, key []byte) ([]byte, error) { - if version < db.tsLow { + if version < db.earliestVersion { return nil, errorutils.ErrRecordNotFound } @@ -320,7 +320,6 @@ func (db *Database) Prune(version int64) error { db.tsLow = tsLow - // Update earliestVersion to match (for API consistency with PebbleDB) return db.SetEarliestVersion(tsLow, false) } @@ -337,7 +336,7 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) start, end = util.IterateWithPrefix(prefix, start, end) itr := db.storage.NewIteratorCF(newTSReadOptions(version), db.cfHandle) - return NewRocksDBIterator(itr, prefix, start, end, false), nil + return NewRocksDBIterator(itr, prefix, start, end, version, db.earliestVersion, false), nil } func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { @@ -353,7 +352,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ start, end = util.IterateWithPrefix(prefix, start, end) itr := db.storage.NewIteratorCF(newTSReadOptions(version), db.cfHandle) - return NewRocksDBIterator(itr, prefix, start, end, true), nil + return NewRocksDBIterator(itr, prefix, start, end, version, db.earliestVersion, true), nil } // Import loads the initial version of the state in parallel with numWorkers goroutines diff --git a/ss/rocksdb/db_test.go b/ss/rocksdb/db_test.go index 8e5ee216..605f377e 100644 --- a/ss/rocksdb/db_test.go +++ b/ss/rocksdb/db_test.go @@ -13,11 +13,13 @@ import ( ) func TestStorageTestSuite(t *testing.T) { + rocksConfig := config.DefaultStateStoreConfig() + rocksConfig.Backend = "rocksdb" s := &sstest.StorageTestSuite{ NewDB: func(dir string, config config.StateStoreConfig) (types.StateStore, error) { return New(dir, config) }, - Config: config.DefaultStateStoreConfig(), + Config: rocksConfig, EmptyBatchSize: 12, } diff --git a/ss/rocksdb/iterator.go b/ss/rocksdb/iterator.go index 215af648..077bb2ac 100644 --- a/ss/rocksdb/iterator.go +++ b/ss/rocksdb/iterator.go @@ -15,11 +15,25 @@ var _ types.DBIterator = (*iterator)(nil) type iterator struct { source *grocksdb.Iterator prefix, start, end []byte + version int64 reverse bool invalid bool } -func NewRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, reverse bool) *iterator { +func NewRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, version int64, earliestVersion int64, reverse bool) *iterator { + // Return invalid iterator if requested iterator height is lower than earliest version after pruning + if version < earliestVersion { + return &iterator{ + source: source, + prefix: prefix, + start: start, + end: end, + version: version, + reverse: reverse, + invalid: true, + } + } + if reverse { if end == nil { source.SeekToLast() @@ -48,6 +62,7 @@ func NewRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, re prefix: prefix, start: start, end: end, + version: version, reverse: reverse, invalid: !source.Valid(), } From f307ad5c901216355682856cb864164e472b6500 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 16:42:54 -0400 Subject: [PATCH 12/16] Fix iterator call --- ss/rocksdb/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 1e9f1b0d..4db6e716 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -426,7 +426,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte defer readOpts.Destroy() itr := db.storage.NewIteratorCF(readOpts, db.cfHandle) - rocksItr := NewRocksDBIterator(itr, prefix, start, end, false) + rocksItr := NewRocksDBIterator(itr, prefix, start, end, latestVersion, 1, false) defer rocksItr.Close() for rocksItr.Valid() { From 2ed275a2d287c3b150736fc7fb45e03eaadc17ce Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 16:56:40 -0400 Subject: [PATCH 13/16] record not found --- ss/rocksdb/db.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 4db6e716..6b7c4018 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -229,6 +229,11 @@ func (db *Database) Get(storeKey string, version int64, key []byte) ([]byte, err return nil, fmt.Errorf("failed to get RocksDB slice: %w", err) } + if !slice.Exists() { + slice.Free() + return nil, errorutils.ErrRecordNotFound + } + return copyAndFreeSlice(slice), nil } From 3fbfa3b77905a076a90a127095440a921cedf3cc Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 17:13:43 -0400 Subject: [PATCH 14/16] Rocksdb tests --- ss/test/storage_test_suite.go | 56 +++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index 92309c9c..17f397b7 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -718,38 +718,42 @@ func (s *StorageTestSuite) TestDatabasePruneKeepRecent() { } func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { - // Update config to set KeepLastVersion to false - stateStoreConfig := s.Config - stateStoreConfig.KeepLastVersion = false - db, err := s.NewDB(s.T().TempDir(), stateStoreConfig) - s.Require().NoError(err) - defer db.Close() + // Only test KeepLastVersion = false for pebbledb backend + // NOTE: KeepLastVersion is always true and will be removed in future + if s.Config.Backend == "pebbledb" { + // Update config to set KeepLastVersion to false + stateStoreConfig := s.Config + stateStoreConfig.KeepLastVersion = false + db, err := s.NewDB(s.T().TempDir(), stateStoreConfig) + s.Require().NoError(err) + defer db.Close() - s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) - s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value002")})) - s.Require().NoError(DBApplyChangeset(db, 200, storeKey1, [][]byte{[]byte("key002")}, [][]byte{[]byte("value003")})) - s.Require().NoError(DBApplyChangeset(db, 200, storeKey1, [][]byte{[]byte("key003")}, [][]byte{[]byte("value004")})) + s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) + s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value002")})) + s.Require().NoError(DBApplyChangeset(db, 200, storeKey1, [][]byte{[]byte("key002")}, [][]byte{[]byte("value003")})) + s.Require().NoError(DBApplyChangeset(db, 200, storeKey1, [][]byte{[]byte("key003")}, [][]byte{[]byte("value004")})) - // prune version 150 - s.Require().NoError(db.Prune(150)) + // prune version 150 + s.Require().NoError(db.Prune(150)) - // Verify that all keys before prune height are deleted - bz, err := db.Get(storeKey1, 100, []byte("key000")) - s.Require().ErrorIs(err, errorutils.ErrRecordNotFound) - s.Require().Nil(bz) + // Verify that all keys before prune height are deleted + bz, err := db.Get(storeKey1, 100, []byte("key000")) + s.Require().ErrorIs(err, errorutils.ErrRecordNotFound) + s.Require().Nil(bz) - bz, err = db.Get(storeKey1, 160, []byte("key001")) - s.Require().ErrorIs(err, errorutils.ErrRecordNotFound) - s.Require().Nil(bz) + bz, err = db.Get(storeKey1, 160, []byte("key001")) + s.Require().ErrorIs(err, errorutils.ErrRecordNotFound) + s.Require().Nil(bz) - // Verify keys after prune height can be retrieved - bz, err = db.Get(storeKey1, 200, []byte("key002")) - s.Require().NoError(err) - s.Require().Equal([]byte("value003"), bz) + // Verify keys after prune height can be retrieved + bz, err = db.Get(storeKey1, 200, []byte("key002")) + s.Require().NoError(err) + s.Require().Equal([]byte("value003"), bz) - bz, err = db.Get(storeKey1, 220, []byte("key003")) - s.Require().NoError(err) - s.Require().Equal([]byte("value004"), bz) + bz, err = db.Get(storeKey1, 220, []byte("key003")) + s.Require().NoError(err) + s.Require().Equal([]byte("value004"), bz) + } // Now reset KeepLastVersion to true and verify latest version of key exists newDB, err := s.NewDB(s.T().TempDir(), s.Config) From 8a0e2538672f8d5228b20942c66d0002588611d8 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 17:16:32 -0400 Subject: [PATCH 15/16] Update tests --- ss/test/storage_test_suite.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index 17f397b7..5b9f9f0b 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -769,7 +769,7 @@ func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { s.Require().NoError(newDB.Prune(150)) // Can still retrieve those keys because KeepLastVersion is true - bz, err = newDB.Get(storeKey1, 160, []byte("key000")) + bz, err := newDB.Get(storeKey1, 160, []byte("key000")) s.Require().NoError(err) s.Require().Equal([]byte("value001"), bz) From 1c8189cb3cdf6f801c2d1732710c45090396d905 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 30 Sep 2025 17:31:49 -0400 Subject: [PATCH 16/16] targeted rocksdb tests --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 4f20435c..36ee6698 100644 --- a/Makefile +++ b/Makefile @@ -60,7 +60,7 @@ test-all: go test -v -mod=readonly ./... -covermode=atomic -coverprofile=./profile.out test-rocksdb: - CGO_CFLAGS="-I/usr/local/include" CGO_LDFLAGS="-L/usr/local/lib -lrocksdb -lz -lbz2 -lsnappy -llz4 -lzstd -ljemalloc" go test -v -mod=readonly -tags=rocksdbBackend ./... -covermode=atomic -coverprofile=./profile.out + CGO_CFLAGS="-I/usr/local/include" CGO_LDFLAGS="-L/usr/local/lib -lrocksdb -lz -lbz2 -lsnappy -llz4 -lzstd -ljemalloc" go test -v -mod=readonly -tags=rocksdbBackend ./ss/rocksdb/... -covermode=atomic -coverprofile=./profile.out lint-all: golangci-lint run --config=.golangci.yml