diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index dcd0431b..ee1d8ab7 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -25,8 +25,57 @@ jobs: name: "${{ github.sha }}-coverage" path: ./profile.out + tests-rocksdb: + runs-on: ubuntu-latest + steps: + - uses: actions/setup-go@v3 + with: + go-version: '1.19' + - uses: actions/checkout@v3 + + - name: Install RocksDB dependencies + run: | + sudo apt-get update + sudo apt-get install -y build-essential pkg-config cmake git zlib1g-dev libbz2-dev libsnappy-dev liblz4-dev libzstd-dev libjemalloc-dev libgflags-dev liburing-dev + + - name: Cache RocksDB + id: cache-rocksdb + uses: actions/cache@v3 + with: + path: | + /usr/local/lib/librocksdb.* + /usr/local/include/rocksdb + key: rocksdb-v8.9.1-${{ runner.os }} + + - name: Build and Install RocksDB from source + if: steps.cache-rocksdb.outputs.cache-hit != 'true' + run: | + git clone https://github.com/facebook/rocksdb.git + cd rocksdb + git checkout v8.9.1 + make clean + CXXFLAGS='-march=native -DNDEBUG' make -j"$(nproc)" shared_lib + sudo make install-shared + cd .. + + - name: Configure RocksDB library path + run: | + echo '/usr/local/lib' | sudo tee /etc/ld.so.conf.d/rocksdb.conf + sudo ldconfig + + - name: Run RocksDB Tests + run: | + export CGO_CFLAGS="-I/usr/local/include" + export CGO_LDFLAGS="-L/usr/local/lib -lrocksdb -lz -lbz2 -lsnappy -llz4 -lzstd -ljemalloc" + make test-rocksdb + + - uses: actions/upload-artifact@v4 + with: + name: "${{ github.sha }}-coverage-rocksdb" + path: ./profile.out + upload-coverage-report: - needs: tests + needs: [tests, tests-rocksdb] runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/Makefile b/Makefile index fc7e4302..36ee6698 100644 --- a/Makefile +++ b/Makefile @@ -59,5 +59,8 @@ BUILD_FLAGS := -tags "$(build_tags)" -ldflags '$(ldflags)' test-all: go test -v -mod=readonly ./... -covermode=atomic -coverprofile=./profile.out +test-rocksdb: + CGO_CFLAGS="-I/usr/local/include" CGO_LDFLAGS="-L/usr/local/lib -lrocksdb -lz -lbz2 -lsnappy -llz4 -lzstd -ljemalloc" go test -v -mod=readonly -tags=rocksdbBackend ./ss/rocksdb/... -covermode=atomic -coverprofile=./profile.out + lint-all: golangci-lint run --config=.golangci.yml diff --git a/go.mod b/go.mod index fddc5536..2de99b47 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/cosmos/iavl v0.21.0-alpha.1.0.20230904092046-df3db2d96583 github.com/gogo/protobuf v1.3.3 github.com/ledgerwatch/erigon-lib v0.0.0-20230210071639-db0e7ed11263 - github.com/linxGnu/grocksdb v1.8.4 + github.com/linxGnu/grocksdb v1.8.11 github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.4 github.com/tendermint/tm-db v0.6.8-0.20220519162814-e24b96538a12 diff --git a/go.sum b/go.sum index 6b4e48b5..6b11eeac 100644 --- a/go.sum +++ b/go.sum @@ -702,8 +702,8 @@ github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= -github.com/linxGnu/grocksdb v1.8.4 h1:ZMsBpPpJNtRLHiKKp0mI7gW+NT4s7UgfD5xHxx1jVRo= -github.com/linxGnu/grocksdb v1.8.4/go.mod h1:xZCIb5Muw+nhbDK4Y5UJuOrin5MceOuiXkVUR7vp4WY= +github.com/linxGnu/grocksdb v1.8.11 h1:BGol9e5gB1BrsTvOxloC88pe70TCqgrfLNwkyWW0kD8= +github.com/linxGnu/grocksdb v1.8.11/go.mod h1:xZCIb5Muw+nhbDK4Y5UJuOrin5MceOuiXkVUR7vp4WY= github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= diff --git a/ss/pebbledb/db_test.go b/ss/pebbledb/db_test.go index 07b43466..075b80d3 100644 --- a/ss/pebbledb/db_test.go +++ b/ss/pebbledb/db_test.go @@ -10,11 +10,13 @@ import ( ) func TestStorageTestSuite(t *testing.T) { + pebbleConfig := config.DefaultStateStoreConfig() + pebbleConfig.Backend = "pebbledb" s := &sstest.StorageTestSuite{ NewDB: func(dir string, config config.StateStoreConfig) (types.StateStore, error) { return New(dir, config) }, - Config: config.DefaultStateStoreConfig(), + Config: pebbleConfig, EmptyBatchSize: 12, } diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index c047c6aa..6b7c4018 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -9,21 +9,26 @@ import ( "errors" "fmt" "sync" + "time" "github.com/linxGnu/grocksdb" errorutils "github.com/sei-protocol/sei-db/common/errors" + "github.com/sei-protocol/sei-db/common/logger" + "github.com/sei-protocol/sei-db/common/utils" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/proto" "github.com/sei-protocol/sei-db/ss/types" "github.com/sei-protocol/sei-db/ss/util" + "github.com/sei-protocol/sei-db/stream/changelog" "golang.org/x/exp/slices" ) const ( TimestampSize = 8 - StorePrefixTpl = "s/k:%s/" - latestVersionKey = "s/latest" + StorePrefixTpl = "s/k:%s/" + latestVersionKey = "s/latest" + earliestVersionKey = "s/earliest" // TODO: Make configurable ImportCommitBatchSize = 10000 @@ -36,6 +41,11 @@ var ( defaultReadOpts = grocksdb.NewDefaultReadOptions() ) +type VersionedChangesets struct { + Version int64 + Changesets []*proto.NamedChangeSet +} + type Database struct { storage *grocksdb.DB config config.StateStoreConfig @@ -45,6 +55,17 @@ type Database struct { // a lazy manner, we use this value to prevent reads for versions that will // be purged in the next compaction. tsLow int64 + + // Earliest version for db after pruning + earliestVersion int64 + + asyncWriteWG sync.WaitGroup + + // Changelog used to support async write + streamHandler *changelog.Stream + + // Pending changes to be written to the DB + pendingChanges chan VersionedChangesets } func New(dataDir string, config config.StateStoreConfig) (*Database, error) { @@ -64,12 +85,36 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { tsLow = int64(binary.LittleEndian.Uint64(tsLowBz)) } - return &Database{ - storage: storage, - config: config, - cfHandle: cfHandle, - tsLow: tsLow, - }, nil + earliestVersion, err := retrieveEarliestVersion(storage) + if err != nil { + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + + database := &Database{ + storage: storage, + config: config, + cfHandle: cfHandle, + tsLow: tsLow, + earliestVersion: earliestVersion, + pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), + } + + if config.DedicatedChangelog { + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: 300 * time.Second, + }, + ) + database.streamHandler = streamHandler + go database.writeAsyncInBackground() + } + + return database, nil } func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Database, error) { @@ -83,14 +128,32 @@ func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Da if len(tsLowBz) > 0 { tsLow = int64(binary.LittleEndian.Uint64(tsLowBz)) } + + earliestVersion, err := retrieveEarliestVersion(storage) + if err != nil { + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + return &Database{ - storage: storage, - cfHandle: cfHandle, - tsLow: tsLow, + storage: storage, + cfHandle: cfHandle, + tsLow: tsLow, + earliestVersion: earliestVersion, }, nil } func (db *Database) Close() error { + if db.streamHandler != nil { + // Close the changelog stream first + db.streamHandler.Close() + // Close the pending changes channel to signal the background goroutine to stop + close(db.pendingChanges) + // Wait for the async writes to finish processing all buffered items + db.asyncWriteWG.Wait() + // Only set to nil after background goroutine has finished + db.streamHandler = nil + } + db.storage.Close() db.storage = nil @@ -129,15 +192,22 @@ func (db *Database) GetLatestVersion() (int64, error) { } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { - panic("not implemented") + if version > db.earliestVersion || ignoreVersion { + db.earliestVersion = version + + var ts [TimestampSize]byte + binary.LittleEndian.PutUint64(ts[:], uint64(version)) + return db.storage.Put(defaultWriteOpts, []byte(earliestVersionKey), ts[:]) + } + return nil } func (db *Database) GetEarliestVersion() (int64, error) { - panic("not implemented") + return db.earliestVersion, nil } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { - if version < db.tsLow { + if version < db.earliestVersion { return false, nil } @@ -150,7 +220,7 @@ func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error } func (db *Database) Get(storeKey string, version int64, key []byte) ([]byte, error) { - if version < db.tsLow { + if version < db.earliestVersion { return nil, errorutils.ErrRecordNotFound } @@ -159,10 +229,23 @@ func (db *Database) Get(storeKey string, version int64, key []byte) ([]byte, err return nil, fmt.Errorf("failed to get RocksDB slice: %w", err) } + if !slice.Exists() { + slice.Free() + return nil, errorutils.ErrRecordNotFound + } + return copyAndFreeSlice(slice), nil } func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) error { + // Check if version is 0 and change it to 1 + // We do this specifically since keys written as part of genesis state come in as version 0 + // But pebbledb treats version 0 as special, so apply the changeset at version 1 instead + // Port this over to rocksdb for consistency + if version == 0 { + version = 1 + } + b := NewBatch(db, version) for _, kvPair := range cs.Changeset.Pairs { @@ -181,7 +264,45 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { - return fmt.Errorf("not implemented") + // Write to WAL first + if db.streamHandler != nil { + entry := proto.ChangelogEntry{ + Version: version, + } + entry.Changesets = changesets + entry.Upgrades = nil + err := db.streamHandler.WriteNextEntry(entry) + if err != nil { + return err + } + } + // Then write to pending changes + db.pendingChanges <- VersionedChangesets{ + Version: version, + Changesets: changesets, + } + + return nil +} + +func (db *Database) writeAsyncInBackground() { + db.asyncWriteWG.Add(1) + defer db.asyncWriteWG.Done() + for nextChange := range db.pendingChanges { + if db.streamHandler != nil { + version := nextChange.Version + for _, cs := range nextChange.Changesets { + err := db.ApplyChangeset(version, cs) + if err != nil { + panic(err) + } + } + err := db.SetLatestVersion(version) + if err != nil { + panic(err) + } + } + } } // Prune attempts to prune all versions up to and including the provided version. @@ -203,7 +324,8 @@ func (db *Database) Prune(version int64) error { } db.tsLow = tsLow - return nil + + return db.SetEarliestVersion(tsLow, false) } func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { @@ -219,7 +341,7 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) start, end = util.IterateWithPrefix(prefix, start, end) itr := db.storage.NewIteratorCF(newTSReadOptions(version), db.cfHandle) - return NewRocksDBIterator(itr, prefix, start, end, false), nil + return NewRocksDBIterator(itr, prefix, start, end, version, db.earliestVersion, false), nil } func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { @@ -235,7 +357,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ start, end = util.IterateWithPrefix(prefix, start, end) itr := db.storage.NewIteratorCF(newTSReadOptions(version), db.cfHandle) - return NewRocksDBIterator(itr, prefix, start, end, true), nil + return NewRocksDBIterator(itr, prefix, start, end, version, db.earliestVersion, true), nil } // Import loads the initial version of the state in parallel with numWorkers goroutines @@ -309,7 +431,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte defer readOpts.Destroy() itr := db.storage.NewIteratorCF(readOpts, db.cfHandle) - rocksItr := NewRocksDBIterator(itr, prefix, start, end, false) + rocksItr := NewRocksDBIterator(itr, prefix, start, end, latestVersion, 1, false) defer rocksItr.Close() for rocksItr.Valid() { @@ -328,6 +450,14 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte return false, nil } +func (db *Database) GetLatestMigratedKey() ([]byte, error) { + panic("not implemented") +} + +func (db *Database) GetLatestMigratedModule() (string, error) { + panic("not implemented") +} + // newTSReadOptions returns ReadOptions used in the RocksDB column family read. func newTSReadOptions(version int64) *grocksdb.ReadOptions { var ts [TimestampSize]byte @@ -383,3 +513,19 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { panic("implement me") } + +// retrieveEarliestVersion retrieves the earliest version from the database +func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { + bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey)) + if err != nil { + fmt.Printf("warning: rocksdb get for earliestVersionKey failed: %v", err) + return 0, nil + } + + if len(bz) == 0 { + // in case of a fresh database + return 0, nil + } + + return int64(binary.LittleEndian.Uint64(bz)), nil +} diff --git a/ss/rocksdb/db_test.go b/ss/rocksdb/db_test.go index afebe71a..605f377e 100644 --- a/ss/rocksdb/db_test.go +++ b/ss/rocksdb/db_test.go @@ -13,10 +13,13 @@ import ( ) func TestStorageTestSuite(t *testing.T) { + rocksConfig := config.DefaultStateStoreConfig() + rocksConfig.Backend = "rocksdb" s := &sstest.StorageTestSuite{ - NewDB: func(dir string) (types.StateStore, error) { - return New(dir, config.DefaultStateStoreConfig()) + NewDB: func(dir string, config config.StateStoreConfig) (types.StateStore, error) { + return New(dir, config) }, + Config: rocksConfig, EmptyBatchSize: 12, } diff --git a/ss/rocksdb/iterator.go b/ss/rocksdb/iterator.go index 215af648..077bb2ac 100644 --- a/ss/rocksdb/iterator.go +++ b/ss/rocksdb/iterator.go @@ -15,11 +15,25 @@ var _ types.DBIterator = (*iterator)(nil) type iterator struct { source *grocksdb.Iterator prefix, start, end []byte + version int64 reverse bool invalid bool } -func NewRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, reverse bool) *iterator { +func NewRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, version int64, earliestVersion int64, reverse bool) *iterator { + // Return invalid iterator if requested iterator height is lower than earliest version after pruning + if version < earliestVersion { + return &iterator{ + source: source, + prefix: prefix, + start: start, + end: end, + version: version, + reverse: reverse, + invalid: true, + } + } + if reverse { if end == nil { source.SeekToLast() @@ -48,6 +62,7 @@ func NewRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, re prefix: prefix, start: start, end: end, + version: version, reverse: reverse, invalid: !source.Valid(), } diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index bda45da5..5b9f9f0b 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -718,38 +718,42 @@ func (s *StorageTestSuite) TestDatabasePruneKeepRecent() { } func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { - // Update config to set KeepLastVersion to false - stateStoreConfig := s.Config - stateStoreConfig.KeepLastVersion = false - db, err := s.NewDB(s.T().TempDir(), stateStoreConfig) - s.Require().NoError(err) - defer db.Close() + // Only test KeepLastVersion = false for pebbledb backend + // NOTE: KeepLastVersion is always true and will be removed in future + if s.Config.Backend == "pebbledb" { + // Update config to set KeepLastVersion to false + stateStoreConfig := s.Config + stateStoreConfig.KeepLastVersion = false + db, err := s.NewDB(s.T().TempDir(), stateStoreConfig) + s.Require().NoError(err) + defer db.Close() - s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) - s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value002")})) - s.Require().NoError(DBApplyChangeset(db, 200, storeKey1, [][]byte{[]byte("key002")}, [][]byte{[]byte("value003")})) - s.Require().NoError(DBApplyChangeset(db, 200, storeKey1, [][]byte{[]byte("key003")}, [][]byte{[]byte("value004")})) + s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) + s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value002")})) + s.Require().NoError(DBApplyChangeset(db, 200, storeKey1, [][]byte{[]byte("key002")}, [][]byte{[]byte("value003")})) + s.Require().NoError(DBApplyChangeset(db, 200, storeKey1, [][]byte{[]byte("key003")}, [][]byte{[]byte("value004")})) - // prune version 150 - s.Require().NoError(db.Prune(150)) + // prune version 150 + s.Require().NoError(db.Prune(150)) - // Verify that all keys before prune height are deleted - bz, err := db.Get(storeKey1, 100, []byte("key000")) - s.Require().ErrorIs(err, errorutils.ErrRecordNotFound) - s.Require().Nil(bz) + // Verify that all keys before prune height are deleted + bz, err := db.Get(storeKey1, 100, []byte("key000")) + s.Require().ErrorIs(err, errorutils.ErrRecordNotFound) + s.Require().Nil(bz) - bz, err = db.Get(storeKey1, 160, []byte("key001")) - s.Require().ErrorIs(err, errorutils.ErrRecordNotFound) - s.Require().Nil(bz) + bz, err = db.Get(storeKey1, 160, []byte("key001")) + s.Require().ErrorIs(err, errorutils.ErrRecordNotFound) + s.Require().Nil(bz) - // Verify keys after prune height can be retrieved - bz, err = db.Get(storeKey1, 200, []byte("key002")) - s.Require().NoError(err) - s.Require().Equal([]byte("value003"), bz) + // Verify keys after prune height can be retrieved + bz, err = db.Get(storeKey1, 200, []byte("key002")) + s.Require().NoError(err) + s.Require().Equal([]byte("value003"), bz) - bz, err = db.Get(storeKey1, 220, []byte("key003")) - s.Require().NoError(err) - s.Require().Equal([]byte("value004"), bz) + bz, err = db.Get(storeKey1, 220, []byte("key003")) + s.Require().NoError(err) + s.Require().Equal([]byte("value004"), bz) + } // Now reset KeepLastVersion to true and verify latest version of key exists newDB, err := s.NewDB(s.T().TempDir(), s.Config) @@ -765,7 +769,7 @@ func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { s.Require().NoError(newDB.Prune(150)) // Can still retrieve those keys because KeepLastVersion is true - bz, err = newDB.Get(storeKey1, 160, []byte("key000")) + bz, err := newDB.Get(storeKey1, 160, []byte("key000")) s.Require().NoError(err) s.Require().Equal([]byte("value001"), bz) @@ -1224,6 +1228,12 @@ func (s *StorageTestSuite) TestDatabaseImport() { } func (s *StorageTestSuite) TestDatabaseRawImport() { + // RawImport is only useful for PebbleDB backend + // NOTE: Will be removed from interface soon + if s.Config.Backend != "pebbledb" { + s.T().Skip("RawImport test only runs for pebbledb backend") + } + db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) defer db.Close() diff --git a/ss/types/store.go b/ss/types/store.go index 56844468..5fa574c5 100644 --- a/ss/types/store.go +++ b/ss/types/store.go @@ -19,11 +19,8 @@ type StateStore interface { GetEarliestVersion() (int64, error) SetEarliestVersion(version int64, ignoreVersion bool) error GetLatestMigratedKey() ([]byte, error) - SetLatestMigratedKey(key []byte) error GetLatestMigratedModule() (string, error) - SetLatestMigratedModule(module string) error WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error - DeleteKeysAtVersion(module string, version int64) error // ApplyChangeset Persist the change set of a block, // the `changeSet` should be ordered by (storeKey, key),