From a00303e85a962f72aaeed41316226f52c8232c0d Mon Sep 17 00:00:00 2001 From: jwasinger Date: Wed, 29 Nov 2023 15:33:50 +0800 Subject: [PATCH 01/10] all: replace log15 with slog (#28187) This PR replaces Geth's logger package (a fork of [log15](https://github.com/inconshreveable/log15)) with an implementation using slog, a logging library included as part of the Go standard library as of Go1.21. Main changes are as follows: * removes any log handlers that were unused in the Geth codebase. * Json, logfmt, and terminal formatters are now slog handlers. * Verbosity level constants are changed to match slog constant values. Internal translation is done to make this opaque to the user and backwards compatible with existing `--verbosity` and `--vmodule` options. * `--log.backtraceat` and `--log.debug` are removed. The external-facing API is largely the same as the existing Geth logger. Logger method signatures remain unchanged. A small semantic difference is that a `Handler` can only be set once per `Logger` and not changed dynamically. This just means that a new logger must be instantiated every time the handler of the root logger is changed. ---- For users of the `go-ethereum/log` module. If you were using this module for your own project, you will need to change the initialization. If you previously did ```golang log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) ``` You now instead need to do ```golang log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelInfo, true))) ``` See more about reasoning here: https://github.com/ethereum/go-ethereum/issues/28558#issuecomment-1820606613 --- log/logger_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/log/logger_test.go b/log/logger_test.go index a633f5ad7a4c..8dae62f48faa 100644 --- a/log/logger_test.go +++ b/log/logger_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/holiman/uint256" + "golang.org/x/exp/slog" ) From acbae3f77e4183f0bbbe61325af72274419f65a7 Mon Sep 17 00:00:00 2001 From: fearlessfe <505380967@qq.com> Date: Sun, 3 Dec 2023 23:13:29 +0800 Subject: [PATCH 02/10] tmp --- go.mod | 2 + go.sum | 4 + p2p/discover/portal_storage.go | 165 ++++++++++++++++++++++++++++++++- 3 files changed, 169 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index b72d5ed2e9ee..f4a7cbebc0d0 100644 --- a/go.mod +++ b/go.mod @@ -122,7 +122,9 @@ require ( github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect + github.com/linxGnu/grocksdb v1.8.6 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/mattn/go-sqlite3 v1.14.18 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect diff --git a/go.sum b/go.sum index 2847b12594fd..8ceb123621ea 100644 --- a/go.sum +++ b/go.sum @@ -421,6 +421,8 @@ github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvf github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= +github.com/linxGnu/grocksdb v1.8.6 h1:O7I6SIGPrypf3f/gmrrLUBQDKfO8uOoYdWf4gLS06tc= +github.com/linxGnu/grocksdb v1.8.6/go.mod h1:xZCIb5Muw+nhbDK4Y5UJuOrin5MceOuiXkVUR7vp4WY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -443,6 +445,8 @@ github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI= +github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= diff --git a/p2p/discover/portal_storage.go b/p2p/discover/portal_storage.go index 3b9023be6acd..d7a7ec995c61 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/portal_storage.go @@ -1,8 +1,27 @@ package discover -import "fmt" +import ( + "crypto/sha256" + "database/sql" + "fmt" + "path" -var ContentNotFound = fmt.Errorf("content not found") + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/holiman/uint256" + "github.com/linxGnu/grocksdb" + _ "github.com/mattn/go-sqlite3" +) + +var ( + ContentNotFound = fmt.Errorf("content not found") + + maxDistance = uint256.MustFromHex("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") +) + +const ( + kvStoreName = "rocksdb" + sqliteName = "shisui.sqlite" +) type Storage interface { ContentId(contentKey []byte) []byte @@ -11,3 +30,145 @@ type Storage interface { Put(contentKey []byte, content []byte) error } + +func getDistance(a *uint256.Int, b *uint256.Int) *uint256.Int { + return new(uint256.Int).Xor(a, b) +} + +// opts := grocksdb.NewDefaultOptions() + +// opts.SetCreateIfMissing(true) + +// db, err := grocksdb.OpenDb(opts, "/path/to/db") + +type PortalStorage struct { + nodeId enode.ID + nodeDataDir string + storageCapacityInBytes uint64 + radius *uint256.Int + kv *grocksdb.DB + sqliteDB *sql.DB +} + +func NewPortalStorage(storageCapacityInMB uint64, nodeId enode.ID, nodeDataDir string) (*PortalStorage, error) { + + opts := grocksdb.NewDefaultOptions() + opts.SetCreateIfMissing(true) + + kv, err := grocksdb.OpenDb(opts, path.Join(nodeDataDir, kvStoreName)) + + if err != nil { + return nil, err + } + + sqlDb, err := sql.Open("sqlite3", path.Join(nodeDataDir, sqliteName)) + + if err != nil { + return nil, err + } + + portalStorage := &PortalStorage{ + nodeId: nodeId, + nodeDataDir: nodeDataDir, + storageCapacityInBytes: storageCapacityInMB * 1000 * 1000, + radius: maxDistance, + kv: kv, + sqliteDB: sqlDb, + } + + err = portalStorage.setupSql() + + if err != nil { + return nil, err + } + + // Check whether we already have data, and use it to set radius + + return portalStorage, nil +} + +func (p *PortalStorage) ContentId(contentKey []byte) []byte { + digest := sha256.Sum256(contentKey) + return digest[:] +} + +func (p *PortalStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) { + return p.kv.GetBytes(&grocksdb.ReadOptions{}, contentId) +} + +func (p *PortalStorage) setupSql() error { + stat, err := p.sqliteDB.Prepare(CREATE_QUERY) + if err != nil { + return err + } + defer stat.Close() + _, err = stat.Exec() + return err +} + +func (p *PortalStorage) total_entry_count() (uint64, error) { + stat, err := p.sqliteDB.Prepare(TOTAL_ENTRY_COUNT_QUERY) + if err != nil { + return 0, err + } + defer stat.Close() + var total uint64 + err = stat.QueryRow().Scan(&total) + return total, err +} + +func (p *PortalStorage) capacityReached() (bool, error) { + storageUsage, err := p.getTotalStorageUsageInBytesFromNetwork() + return storageUsage > p.storageCapacityInBytes, err +} + +// Internal method for measuring the total amount of requestable data that the node is storing. +func (p *PortalStorage) getTotalStorageUsageInBytesFromNetwork() (uint64, error) { + stat, err := p.sqliteDB.Prepare(TOTAL_DATA_SIZE_QUERY) + if err != nil { + return 0, err + } + defer stat.Close() + var total uint64 + err = stat.QueryRow().Scan(&total) + return total, err +} + +// func (p *PortalStorage) findFarthestContentId() ([]byte, error) { +// stat, err := p.sqliteDB.Prepare(XOR_FIND_FARTHEST_QUERY) +// if err != nil { +// return nil, err +// } +// } + +// SQLite Statements +const CREATE_QUERY = `CREATE TABLE IF NOT EXISTS content_metadata ( + content_id_long TEXT PRIMARY KEY, + content_id_short INTEGER NOT NULL, + content_key TEXT NOT NULL, + content_size INTEGER +); +CREATE INDEX content_size_idx ON content_metadata(content_size); +CREATE INDEX content_id_short_idx ON content_metadata(content_id_short); +CREATE INDEX content_id_long_idx ON content_metadata(content_id_long);` + +const INSERT_QUERY = `INSERT OR IGNORE INTO content_metadata (content_id_long, content_id_short, content_key, content_size) +VALUES (?1, ?2, ?3, ?4)` + +const DELETE_QUERY = `DELETE FROM content_metadata +WHERE content_id_long = (?1)` + +const XOR_FIND_FARTHEST_QUERY = `SELECT + content_id_long + FROM content_metadata + ORDER BY ((?1 | content_id_short) - (?1 & content_id_short)) DESC` + +const CONTENT_KEY_LOOKUP_QUERY = "SELECT content_key FROM content_metadata WHERE content_id_long = (?1)" + +const TOTAL_DATA_SIZE_QUERY = "SELECT TOTAL(content_size) FROM content_metadata" + +const TOTAL_ENTRY_COUNT_QUERY = "SELECT COUNT(content_id_long) FROM content_metadata" + +const PAGINATE_QUERY = "SELECT content_key FROM content_metadata ORDER BY content_key LIMIT :limit OFFSET :offset" + +const CONTENT_SIZE_LOOKUP_QUERY = "SELECT content_size FROM content_metadata WHERE content_id_long = (?1)" From 65b50cde79a767cd8fbd34ebf29d03d7b06d9fa3 Mon Sep 17 00:00:00 2001 From: fearlessfe <505380967@qq.com> Date: Tue, 5 Dec 2023 23:30:28 +0800 Subject: [PATCH 03/10] faet: init sqlite table --- p2p/discover/portal_storage.go | 139 +++++++++++++++------------------ 1 file changed, 65 insertions(+), 74 deletions(-) diff --git a/p2p/discover/portal_storage.go b/p2p/discover/portal_storage.go index d7a7ec995c61..b11358066483 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/portal_storage.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/holiman/uint256" - "github.com/linxGnu/grocksdb" _ "github.com/mattn/go-sqlite3" ) @@ -19,8 +18,7 @@ var ( ) const ( - kvStoreName = "rocksdb" - sqliteName = "shisui.sqlite" + sqliteName = "shisui.sqlite" ) type Storage interface { @@ -35,56 +33,43 @@ func getDistance(a *uint256.Int, b *uint256.Int) *uint256.Int { return new(uint256.Int).Xor(a, b) } -// opts := grocksdb.NewDefaultOptions() - -// opts.SetCreateIfMissing(true) - -// db, err := grocksdb.OpenDb(opts, "/path/to/db") - type PortalStorage struct { nodeId enode.ID nodeDataDir string storageCapacityInBytes uint64 radius *uint256.Int - kv *grocksdb.DB sqliteDB *sql.DB + getStmt *sql.Stmt + putStmt *sql.Stmt + delStmt *sql.Stmt + containStmt *sql.Stmt } -func NewPortalStorage(storageCapacityInMB uint64, nodeId enode.ID, nodeDataDir string) (*PortalStorage, error) { - - opts := grocksdb.NewDefaultOptions() - opts.SetCreateIfMissing(true) - - kv, err := grocksdb.OpenDb(opts, path.Join(nodeDataDir, kvStoreName)) - - if err != nil { - return nil, err - } +func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDir string) (*PortalStorage, error) { sqlDb, err := sql.Open("sqlite3", path.Join(nodeDataDir, sqliteName)) if err != nil { return nil, err } - portalStorage := &PortalStorage{ nodeId: nodeId, nodeDataDir: nodeDataDir, - storageCapacityInBytes: storageCapacityInMB * 1000 * 1000, + storageCapacityInBytes: storageCapacityInBytes, radius: maxDistance, - kv: kv, sqliteDB: sqlDb, } - err = portalStorage.setupSql() - + err = portalStorage.createTable() if err != nil { return nil, err } + err = portalStorage.initStmts() + // Check whether we already have data, and use it to set radius - return portalStorage, nil + return portalStorage, err } func (p *PortalStorage) ContentId(contentKey []byte) []byte { @@ -93,11 +78,30 @@ func (p *PortalStorage) ContentId(contentKey []byte) []byte { } func (p *PortalStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) { - return p.kv.GetBytes(&grocksdb.ReadOptions{}, contentId) + var res []byte + err := p.getStmt.QueryRow().Scan(&res) + if err == sql.ErrNoRows { + return nil, ContentNotFound + } + return res, err +} + +func (p *PortalStorage) Put(contentKey []byte, content []byte) error { + contentId := p.ContentId(contentKey) + _, err := p.putStmt.Exec(contentId, content) + return err +} + +func (p *PortalStorage) Close() error { + p.getStmt.Close() + p.putStmt.Close() + p.delStmt.Close() + p.containStmt.Close() + return p.sqliteDB.Close() } -func (p *PortalStorage) setupSql() error { - stat, err := p.sqliteDB.Prepare(CREATE_QUERY) +func (p *PortalStorage) createTable() error { + stat, err := p.sqliteDB.Prepare(createSql) if err != nil { return err } @@ -106,57 +110,44 @@ func (p *PortalStorage) setupSql() error { return err } -func (p *PortalStorage) total_entry_count() (uint64, error) { - stat, err := p.sqliteDB.Prepare(TOTAL_ENTRY_COUNT_QUERY) - if err != nil { - return 0, err +func (p *PortalStorage) initStmts() error { + var stat *sql.Stmt + var err error + if stat, err = p.sqliteDB.Prepare(getSql); err != nil { + return nil } - defer stat.Close() - var total uint64 - err = stat.QueryRow().Scan(&total) - return total, err -} - -func (p *PortalStorage) capacityReached() (bool, error) { - storageUsage, err := p.getTotalStorageUsageInBytesFromNetwork() - return storageUsage > p.storageCapacityInBytes, err -} - -// Internal method for measuring the total amount of requestable data that the node is storing. -func (p *PortalStorage) getTotalStorageUsageInBytesFromNetwork() (uint64, error) { - stat, err := p.sqliteDB.Prepare(TOTAL_DATA_SIZE_QUERY) - if err != nil { - return 0, err + p.getStmt = stat + if stat, err = p.sqliteDB.Prepare(putSql); err != nil { + return nil } - defer stat.Close() - var total uint64 - err = stat.QueryRow().Scan(&total) - return total, err + p.putStmt = stat + if stat, err = p.sqliteDB.Prepare(deleteSql); err != nil { + return nil + } + p.delStmt = stat + if stat, err = p.sqliteDB.Prepare(containSql); err != nil { + return nil + } + p.containStmt = stat + return nil } -// func (p *PortalStorage) findFarthestContentId() ([]byte, error) { -// stat, err := p.sqliteDB.Prepare(XOR_FIND_FARTHEST_QUERY) -// if err != nil { -// return nil, err -// } -// } +// get database size, content size and similar +func (p *PortalStorage) Size() int64 { + return 0 +} // SQLite Statements -const CREATE_QUERY = `CREATE TABLE IF NOT EXISTS content_metadata ( - content_id_long TEXT PRIMARY KEY, - content_id_short INTEGER NOT NULL, - content_key TEXT NOT NULL, - content_size INTEGER -); -CREATE INDEX content_size_idx ON content_metadata(content_size); -CREATE INDEX content_id_short_idx ON content_metadata(content_id_short); -CREATE INDEX content_id_long_idx ON content_metadata(content_id_long);` - -const INSERT_QUERY = `INSERT OR IGNORE INTO content_metadata (content_id_long, content_id_short, content_key, content_size) -VALUES (?1, ?2, ?3, ?4)` - -const DELETE_QUERY = `DELETE FROM content_metadata -WHERE content_id_long = (?1)` + +const createSql = `CREATE TABLE IF NOT EXISTS kvstore ( + key BLOB PRIMARY KEY, + value BLOB +);` +const getSql = "SELECT value FROM kvstore WHERE key = (?1);" +const putSql = "INSERT OR REPLACE INTO kvstore (key, value) VALUES (?1, ?2);" +const deleteSql = "DELETE FROM kvstore WHERE key = (?1);" +const clearSql = "DELETE FROM kvstore" +const containSql = "SELECT 1 FROM kvstore WHERE key = (?1);" const XOR_FIND_FARTHEST_QUERY = `SELECT content_id_long From 2a3b2a9c503f750e7875d40af4444e1267330a8f Mon Sep 17 00:00:00 2001 From: fearlessfe <505380967@qq.com> Date: Wed, 6 Dec 2023 18:22:42 +0800 Subject: [PATCH 04/10] feat: storage --- p2p/discover/portal_storage.go | 171 +++++++++++++++++++++++++++- p2p/discover/portal_storage_test.go | 37 ++++++ 2 files changed, 202 insertions(+), 6 deletions(-) create mode 100644 p2p/discover/portal_storage_test.go diff --git a/p2p/discover/portal_storage.go b/p2p/discover/portal_storage.go index b11358066483..2e9b561ef266 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/portal_storage.go @@ -3,9 +3,13 @@ package discover import ( "crypto/sha256" "database/sql" + "errors" "fmt" + "math/big" "path" + "strings" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/holiman/uint256" _ "github.com/mattn/go-sqlite3" @@ -19,6 +23,7 @@ var ( const ( sqliteName = "shisui.sqlite" + contentDeletionFraction = 0.05 // 5% of the content will be deleted when the storage capacity is hit and radius gets adjusted. ) type Storage interface { @@ -43,6 +48,7 @@ type PortalStorage struct { putStmt *sql.Stmt delStmt *sql.Stmt containStmt *sql.Stmt + log log.Logger } func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDir string) (*PortalStorage, error) { @@ -58,6 +64,7 @@ func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDi storageCapacityInBytes: storageCapacityInBytes, radius: maxDistance, sqliteDB: sqlDb, + log: log.New("protocol_storage", ""), } err = portalStorage.createTable() @@ -88,8 +95,25 @@ func (p *PortalStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) func (p *PortalStorage) Put(contentKey []byte, content []byte) error { contentId := p.ContentId(contentKey) + _, err := p.putStmt.Exec(contentId, content) - return err + if err != nil { + return err + } + + dbSize, err := p.UsedSize() + if err != nil { + return err + } + if dbSize > p.storageCapacityInBytes { + err = p.deleteContentFraction(contentDeletionFraction) + // + if err != nil { + log.Warn("failed to delete") + } + } + + return nil } func (p *PortalStorage) Close() error { @@ -133,8 +157,141 @@ func (p *PortalStorage) initStmts() error { } // get database size, content size and similar -func (p *PortalStorage) Size() int64 { - return 0 +func (p *PortalStorage) Size() (uint64, error) { + sql := "SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size();" + stmt, err := p.sqliteDB.Prepare(sql) + if err != nil { + return 0, err + } + var res uint64 + err = stmt.QueryRow().Scan(&res) + return res, err +} + +func (p *PortalStorage) UnusedSize() (uint64, error) { + sql := "SELECT freelist_count * page_size as size FROM pragma_freelist_count(), pragma_page_size();" + return p.queryRowUint64(sql) +} + +func (p *PortalStorage) UsedSize() (uint64, error) { + size, err := p.Size() + if err != nil { + return 0, err + } + unusedSize, err := p.UnusedSize() + if err != nil { + return 0, err + } + return size - unusedSize, err +} + +func (p *PortalStorage) ContentSize() (uint64, error) { + sql := "SELECT COUNT(key) FROM kvstore;" + return p.queryRowUint64(sql) +} + +func (p *PortalStorage) ContentCount() (uint64, error) { + sql := "SELECT SUM(length(value)) FROM kvstore" + return p.queryRowUint64(sql) +} + +func (p *PortalStorage) queryRowUint64(sql string) (uint64, error) { + // sql := "SELECT SUM(length(value)) FROM kvstore" + stmt, err := p.sqliteDB.Prepare(sql) + if err != nil { + return 0, err + } + var res uint64 + err = stmt.QueryRow().Scan(&res) + return res, err +} + +func (p *PortalStorage) GetLargestDistance() (*uint256.Int, error) { + stmt, err := p.sqliteDB.Prepare(XOR_FIND_FARTHEST_QUERY) + if err != nil { + return nil, err + } + var contentId []byte + + err = stmt.QueryRow(p.nodeId).Scan(&contentId) + if err != nil { + return nil, err + } + bugNum := new(big.Int).SetBytes(contentId) + res, _ := uint256.FromBig(bugNum) + return res, err +} + +func (p *PortalStorage) EstimateNewRadius() (*uint256.Int, error) { + currrentSize, err := p.UsedSize() + if err != nil { + return nil, err + } + sizeRatio := currrentSize / p.storageCapacityInBytes + if sizeRatio > 0 { + bigFormat := new(big.Int).SetUint64(sizeRatio) + return new(uint256.Int).Div(p.radius, uint256.MustFromBig(bigFormat)), nil + } + return p.radius, nil +} + +func (p *PortalStorage) deleteContentFraction(fraction float64) error { + if fraction <=0 || fraction >=1 { + return errors.New("fraction should be between 0 and 1") + } + totalContentSize, err := p.ContentSize() + if err != nil { + return err + } + bytesToDelete := uint64(fraction * float64(totalContentSize)) + // deleteElements := 0 + deleteBytes := 0 + + rows, err := p.sqliteDB.Query(getAllOrderedByDistanceSql, p.nodeId) + if err != nil { + return nil + } + defer rows.Close() + idsToDelete := make([][]byte, 0) + for deleteBytes < int(bytesToDelete) { + var contentId []byte + var payloadLen int + err = rows.Scan(&contentId, &payloadLen) + if err != nil { + return err + } + idsToDelete = append(idsToDelete, contentId) + deleteBytes += payloadLen + } + err = p.deleteBatchById(idsToDelete...) + + return err +} + +func (p *PortalStorage) deleteBatchById(ids ...[]byte) error { + // 构建带有占位符的 SQL 查询语句 + query := "DELETE FROM kvstore WHERE id IN (?" + strings.Repeat(", ?", len(ids)-1) + ")" + + args := make([]any, len(ids)) + for i, id := range ids { + args[i] = id + } + // 执行删除操作 + _, err := p.sqliteDB.Exec(query, args...) + if err != nil { + return err + } + return nil +} + +func (p *PortalStorage) ReclaimSpace() error { + _, err := p.sqliteDB.Exec("VACUUM;") + return err +} + +func (p *PortalStorage) DeleteContentOutOfRadius(radius uint256.Int) error { + _, err := p.sqliteDB.Exec(deleteOutOfRadiusStmt, p.nodeId, radius) + return err } // SQLite Statements @@ -148,11 +305,13 @@ const putSql = "INSERT OR REPLACE INTO kvstore (key, value) VALUES (?1, ?2);" const deleteSql = "DELETE FROM kvstore WHERE key = (?1);" const clearSql = "DELETE FROM kvstore" const containSql = "SELECT 1 FROM kvstore WHERE key = (?1);" +const getAllOrderedByDistanceSql = "SELECT key, length(value), ((?1 | key) - (?1 & key)) as distance FROM kvstore ORDER BY distance DESC;" +const deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE ((?1 | key) - (?1 & key)) < ?2" const XOR_FIND_FARTHEST_QUERY = `SELECT - content_id_long - FROM content_metadata - ORDER BY ((?1 | content_id_short) - (?1 & content_id_short)) DESC` + ((?1 | key) - (?1 & key)) as distance + FROM kvstore + ORDER BY distance DESC LIMIT 1` const CONTENT_KEY_LOOKUP_QUERY = "SELECT content_key FROM content_metadata WHERE content_id_long = (?1)" diff --git a/p2p/discover/portal_storage_test.go b/p2p/discover/portal_storage_test.go new file mode 100644 index 000000000000..b27580940e26 --- /dev/null +++ b/p2p/discover/portal_storage_test.go @@ -0,0 +1,37 @@ +package discover + +import ( + "database/sql" + "testing" +) + +func getDB() *sql.DB { + const createSql = `CREATE TABLE IF NOT EXISTS kvstore ( + key BLOB PRIMARY KEY, + value BLOB + );` + sqlDb, _ := sql.Open("sqlite3", "test.db") + stat, _ :=sqlDb.Prepare(createSql) + defer stat.Close() + _, _ = stat.Exec() + return sqlDb +} + +func BenchmarkPrepare(b *testing.B) { + const sql = "SELECT value FROM kvstore" + db := getDB() + b.ResetTimer() + for i:=0; i < b.N; i++ { + db.Exec(sql) + } +} + +func BenchmarkNoPrepare(b *testing.B) { + const sql = "SELECT value FROM kvstore" + db := getDB() + stmp, _ := db.Prepare(sql) + b.ResetTimer() + for i:=0; i < b.N; i++ { + stmp.Exec() + } +} \ No newline at end of file From 00e7a40bd4bf1620abaa2914eb94e28338b765a8 Mon Sep 17 00:00:00 2001 From: fearlessfe <505380967@qq.com> Date: Wed, 6 Dec 2023 23:12:07 +0800 Subject: [PATCH 05/10] feat: add test --- p2p/discover/portal_storage.go | 22 ++++++-------- p2p/discover/portal_storage_test.go | 47 +++++++++++------------------ 2 files changed, 27 insertions(+), 42 deletions(-) diff --git a/p2p/discover/portal_storage.go b/p2p/discover/portal_storage.go index 2e9b561ef266..dca98564c71a 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/portal_storage.go @@ -22,7 +22,7 @@ var ( ) const ( - sqliteName = "shisui.sqlite" + sqliteName = "shisui.sqlite" contentDeletionFraction = 0.05 // 5% of the content will be deleted when the storage capacity is hit and radius gets adjusted. ) @@ -34,10 +34,6 @@ type Storage interface { Put(contentKey []byte, content []byte) error } -func getDistance(a *uint256.Int, b *uint256.Int) *uint256.Int { - return new(uint256.Int).Xor(a, b) -} - type PortalStorage struct { nodeId enode.ID nodeDataDir string @@ -48,7 +44,7 @@ type PortalStorage struct { putStmt *sql.Stmt delStmt *sql.Stmt containStmt *sql.Stmt - log log.Logger + log log.Logger } func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDir string) (*PortalStorage, error) { @@ -64,7 +60,7 @@ func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDi storageCapacityInBytes: storageCapacityInBytes, radius: maxDistance, sqliteDB: sqlDb, - log: log.New("protocol_storage", ""), + log: log.New("protocol_storage"), } err = portalStorage.createTable() @@ -86,7 +82,7 @@ func (p *PortalStorage) ContentId(contentKey []byte) []byte { func (p *PortalStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) { var res []byte - err := p.getStmt.QueryRow().Scan(&res) + err := p.getStmt.QueryRow(contentId).Scan(&res) if err == sql.ErrNoRows { return nil, ContentNotFound } @@ -107,12 +103,12 @@ func (p *PortalStorage) Put(contentKey []byte, content []byte) error { } if dbSize > p.storageCapacityInBytes { err = p.deleteContentFraction(contentDeletionFraction) - // + // if err != nil { - log.Warn("failed to delete") + log.Warn("failed to delete oversize item") } } - + return nil } @@ -236,7 +232,7 @@ func (p *PortalStorage) EstimateNewRadius() (*uint256.Int, error) { } func (p *PortalStorage) deleteContentFraction(fraction float64) error { - if fraction <=0 || fraction >=1 { + if fraction <= 0 || fraction >= 1 { return errors.New("fraction should be between 0 and 1") } totalContentSize, err := p.ContentSize() @@ -264,7 +260,7 @@ func (p *PortalStorage) deleteContentFraction(fraction float64) error { deleteBytes += payloadLen } err = p.deleteBatchById(idsToDelete...) - + return err } diff --git a/p2p/discover/portal_storage_test.go b/p2p/discover/portal_storage_test.go index b27580940e26..0156eb1302e1 100644 --- a/p2p/discover/portal_storage_test.go +++ b/p2p/discover/portal_storage_test.go @@ -1,37 +1,26 @@ package discover import ( - "database/sql" + "math" "testing" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/holiman/uint256" + "github.com/stretchr/testify/assert" ) -func getDB() *sql.DB { - const createSql = `CREATE TABLE IF NOT EXISTS kvstore ( - key BLOB PRIMARY KEY, - value BLOB - );` - sqlDb, _ := sql.Open("sqlite3", "test.db") - stat, _ :=sqlDb.Prepare(createSql) - defer stat.Close() - _, _ = stat.Exec() - return sqlDb -} +func TestBasicStorage(t *testing.T) { + zeroNodeId := uint256.NewInt(0).Bytes32() + storage, err := NewPortalStorage(math.MaxUint64, enode.ID(zeroNodeId), "./") + defer storage.Close() + assert.NoError(t, err) -func BenchmarkPrepare(b *testing.B) { - const sql = "SELECT value FROM kvstore" - db := getDB() - b.ResetTimer() - for i:=0; i < b.N; i++ { - db.Exec(sql) - } -} + contentKey := []byte("test") + content := []byte("value") -func BenchmarkNoPrepare(b *testing.B) { - const sql = "SELECT value FROM kvstore" - db := getDB() - stmp, _ := db.Prepare(sql) - b.ResetTimer() - for i:=0; i < b.N; i++ { - stmp.Exec() - } -} \ No newline at end of file + _, err = storage.Get(contentKey, storage.ContentId(contentKey)) + assert.Equal(t, ContentNotFound, err) + + err = storage.Put(contentKey, content) + assert.NoError(t, err) +} From 460556fdb4a08e86df0f9db5ca6ad0cbce05e52b Mon Sep 17 00:00:00 2001 From: fearlessfe <505380967@qq.com> Date: Thu, 7 Dec 2023 23:00:43 +0800 Subject: [PATCH 06/10] feat: tmp test --- go.mod | 3 +- go.sum | 2 - p2p/discover/portal_storage.go | 100 +++++++++------ p2p/discover/portal_storage_test.go | 182 +++++++++++++++++++++++++++- p2p/discover/shisui.sqlite | Bin 0 -> 102400 bytes 5 files changed, 241 insertions(+), 46 deletions(-) create mode 100644 p2p/discover/shisui.sqlite diff --git a/go.mod b/go.mod index f4a7cbebc0d0..7c48a3be8dde 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/kylelemons/godebug v1.1.0 github.com/mattn/go-colorable v0.1.13 github.com/mattn/go-isatty v0.0.17 + github.com/mattn/go-sqlite3 v1.14.18 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/olekukonko/tablewriter v0.0.5 github.com/optimism-java/utp-go v0.0.0-20231203033001-5a531e1e11a0 @@ -122,9 +123,7 @@ require ( github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect - github.com/linxGnu/grocksdb v1.8.6 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect - github.com/mattn/go-sqlite3 v1.14.18 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect diff --git a/go.sum b/go.sum index 8ceb123621ea..029298b46bd2 100644 --- a/go.sum +++ b/go.sum @@ -421,8 +421,6 @@ github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvf github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= -github.com/linxGnu/grocksdb v1.8.6 h1:O7I6SIGPrypf3f/gmrrLUBQDKfO8uOoYdWf4gLS06tc= -github.com/linxGnu/grocksdb v1.8.6/go.mod h1:xZCIb5Muw+nhbDK4Y5UJuOrin5MceOuiXkVUR7vp4WY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= diff --git a/p2p/discover/portal_storage.go b/p2p/discover/portal_storage.go index dca98564c71a..d4945cd1f229 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/portal_storage.go @@ -7,7 +7,6 @@ import ( "fmt" "math/big" "path" - "strings" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" @@ -89,27 +88,53 @@ func (p *PortalStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) return res, err } -func (p *PortalStorage) Put(contentKey []byte, content []byte) error { +type PutResult struct { + err error + pruned bool + count int +} + +func (p *PutResult) Err() error { + return p.err +} + +func (p *PutResult) Pruned() bool { + return p.pruned +} + +func (p *PutResult) PrunedCount() int { + return p.count +} + +func newPutResultWithErr(err error) PutResult { + return PutResult{ + err: err, + } +} + +func (p *PortalStorage) Put(contentKey []byte, content []byte) PutResult { contentId := p.ContentId(contentKey) _, err := p.putStmt.Exec(contentId, content) if err != nil { - return err + return newPutResultWithErr(err) } dbSize, err := p.UsedSize() if err != nil { - return err + return newPutResultWithErr(err) } if dbSize > p.storageCapacityInBytes { - err = p.deleteContentFraction(contentDeletionFraction) + count, err := p.deleteContentFraction(contentDeletionFraction) // if err != nil { log.Warn("failed to delete oversize item") + return newPutResultWithErr(err) } + return PutResult{pruned: true, count: count} } - return nil + return PutResult{} } func (p *PortalStorage) Close() error { @@ -181,12 +206,12 @@ func (p *PortalStorage) UsedSize() (uint64, error) { return size - unusedSize, err } -func (p *PortalStorage) ContentSize() (uint64, error) { +func (p *PortalStorage) ContentCount() (uint64, error) { sql := "SELECT COUNT(key) FROM kvstore;" return p.queryRowUint64(sql) } -func (p *PortalStorage) ContentCount() (uint64, error) { +func (p *PortalStorage) ContentSize() (uint64, error) { sql := "SELECT SUM(length(value)) FROM kvstore" return p.queryRowUint64(sql) } @@ -218,7 +243,7 @@ func (p *PortalStorage) GetLargestDistance() (*uint256.Int, error) { return res, err } -func (p *PortalStorage) EstimateNewRadius() (*uint256.Int, error) { +func (p *PortalStorage) EstimateNewRadius(currentRadius *uint256.Int) (*uint256.Int, error) { currrentSize, err := p.UsedSize() if err != nil { return nil, err @@ -226,58 +251,50 @@ func (p *PortalStorage) EstimateNewRadius() (*uint256.Int, error) { sizeRatio := currrentSize / p.storageCapacityInBytes if sizeRatio > 0 { bigFormat := new(big.Int).SetUint64(sizeRatio) - return new(uint256.Int).Div(p.radius, uint256.MustFromBig(bigFormat)), nil + return new(uint256.Int).Div(currentRadius, uint256.MustFromBig(bigFormat)), nil } - return p.radius, nil + return currentRadius, nil } -func (p *PortalStorage) deleteContentFraction(fraction float64) error { +func (p *PortalStorage) deleteContentFraction(fraction float64) (deleteCount int, err error) { if fraction <= 0 || fraction >= 1 { - return errors.New("fraction should be between 0 and 1") + return deleteCount, errors.New("fraction should be between 0 and 1") } totalContentSize, err := p.ContentSize() if err != nil { - return err + return deleteCount, err } bytesToDelete := uint64(fraction * float64(totalContentSize)) // deleteElements := 0 deleteBytes := 0 - rows, err := p.sqliteDB.Query(getAllOrderedByDistanceSql, p.nodeId) + rows, err := p.sqliteDB.Query(getAllOrderedByDistanceSql, p.nodeId[:]) if err != nil { - return nil + return deleteCount, err } defer rows.Close() - idsToDelete := make([][]byte, 0) - for deleteBytes < int(bytesToDelete) { + for deleteBytes < int(bytesToDelete) && rows.Next() { var contentId []byte var payloadLen int - err = rows.Scan(&contentId, &payloadLen) + var distance []byte + err = rows.Scan(&contentId, &payloadLen, &distance) + if err != nil { + return deleteCount, err + } + err = p.del(contentId) if err != nil { - return err + return deleteCount, err } - idsToDelete = append(idsToDelete, contentId) deleteBytes += payloadLen + deleteCount++ } - err = p.deleteBatchById(idsToDelete...) - return err + return } -func (p *PortalStorage) deleteBatchById(ids ...[]byte) error { - // 构建带有占位符的 SQL 查询语句 - query := "DELETE FROM kvstore WHERE id IN (?" + strings.Repeat(", ?", len(ids)-1) + ")" - - args := make([]any, len(ids)) - for i, id := range ids { - args[i] = id - } - // 执行删除操作 - _, err := p.sqliteDB.Exec(query, args...) - if err != nil { - return err - } - return nil +func (p *PortalStorage) del(contentId []byte) error { + _, err := p.delStmt.Exec(contentId) + return err } func (p *PortalStorage) ReclaimSpace() error { @@ -285,11 +302,15 @@ func (p *PortalStorage) ReclaimSpace() error { return err } -func (p *PortalStorage) DeleteContentOutOfRadius(radius uint256.Int) error { +func (p *PortalStorage) DeleteContentOutOfRadius(radius *uint256.Int) error { _, err := p.sqliteDB.Exec(deleteOutOfRadiusStmt, p.nodeId, radius) return err } +func (p *PortalStorage) ForcePrune(radius *uint256.Int) { + p.DeleteContentOutOfRadius(radius) +} + // SQLite Statements const createSql = `CREATE TABLE IF NOT EXISTS kvstore ( @@ -299,7 +320,8 @@ const createSql = `CREATE TABLE IF NOT EXISTS kvstore ( const getSql = "SELECT value FROM kvstore WHERE key = (?1);" const putSql = "INSERT OR REPLACE INTO kvstore (key, value) VALUES (?1, ?2);" const deleteSql = "DELETE FROM kvstore WHERE key = (?1);" -const clearSql = "DELETE FROM kvstore" + +// const clearSql = "DELETE FROM kvstore" const containSql = "SELECT 1 FROM kvstore WHERE key = (?1);" const getAllOrderedByDistanceSql = "SELECT key, length(value), ((?1 | key) - (?1 & key)) as distance FROM kvstore ORDER BY distance DESC;" const deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE ((?1 | key) - (?1 & key)) < ?2" diff --git a/p2p/discover/portal_storage_test.go b/p2p/discover/portal_storage_test.go index 0156eb1302e1..37e4b1262a2b 100644 --- a/p2p/discover/portal_storage_test.go +++ b/p2p/discover/portal_storage_test.go @@ -1,7 +1,9 @@ package discover import ( + "fmt" "math" + "os" "testing" "github.com/ethereum/go-ethereum/p2p/enode" @@ -9,11 +11,26 @@ import ( "github.com/stretchr/testify/assert" ) +const nodeDataDir = "./" + +func clear() { + os.Remove(fmt.Sprintf("%s%s", nodeDataDir, sqliteName)) +} + +func genBytes(length int) []byte { + res := make([]byte, length) + for i := 0; i < length; i++ { + res[i] = byte(i) + } + return res +} + func TestBasicStorage(t *testing.T) { zeroNodeId := uint256.NewInt(0).Bytes32() - storage, err := NewPortalStorage(math.MaxUint64, enode.ID(zeroNodeId), "./") - defer storage.Close() + storage, err := NewPortalStorage(math.MaxUint32, enode.ID(zeroNodeId), nodeDataDir) assert.NoError(t, err) + defer clear() + defer storage.Close() contentKey := []byte("test") content := []byte("value") @@ -21,6 +38,165 @@ func TestBasicStorage(t *testing.T) { _, err = storage.Get(contentKey, storage.ContentId(contentKey)) assert.Equal(t, ContentNotFound, err) - err = storage.Put(contentKey, content) + pt := storage.Put(contentKey, content) + assert.NoError(t, pt.Err()) + + val, err := storage.Get(contentKey, storage.ContentId(contentKey)) + assert.NoError(t, err) + assert.Equal(t, content, val) + + count, err := storage.ContentCount() + assert.NoError(t, err) + assert.Equal(t, count, uint64(1)) + + size, err := storage.Size() + assert.NoError(t, err) + assert.True(t, size > 0) + + unusedSize, err := storage.UnusedSize() assert.NoError(t, err) + + usedSize, err := storage.UsedSize() + assert.NoError(t, err) + assert.True(t, usedSize == size-unusedSize) } + +func TestDBSize(t *testing.T) { + zeroNodeId := uint256.NewInt(0).Bytes32() + storage, err := NewPortalStorage(math.MaxUint32, enode.ID(zeroNodeId), nodeDataDir) + assert.NoError(t, err) + defer clear() + defer storage.Close() + + numBytes := 10000 + + size1, err := storage.Size() + assert.NoError(t, err) + putResult := storage.Put(uint256.NewInt(1).Bytes(), genBytes(numBytes)) + assert.Nil(t, putResult.Err()) + + size2, err := storage.Size() + assert.NoError(t, err) + putResult = storage.Put(uint256.NewInt(2).Bytes(), genBytes(numBytes)) + assert.NoError(t, putResult.Err()) + + size3, err := storage.Size() + assert.NoError(t, err) + putResult = storage.Put(uint256.NewInt(2).Bytes(), genBytes(numBytes)) + assert.NoError(t, putResult.Err()) + + size4, err := storage.Size() + assert.NoError(t, err) + usedSize, err := storage.UsedSize() + assert.NoError(t, err) + + assert.True(t, size2 > size1) + assert.True(t, size3 > size2) + assert.True(t, size4 == size3) + assert.True(t, usedSize == size4) + + err = storage.del(storage.ContentId(uint256.NewInt(2).Bytes())) + assert.NoError(t, err) + err = storage.del(storage.ContentId(uint256.NewInt(1).Bytes())) + assert.NoError(t, err) + + usedSize1, err := storage.UsedSize() + assert.NoError(t, err) + size5, err := storage.Size() + assert.NoError(t, err) + + assert.True(t, size4 == size5) + assert.True(t, usedSize1 < size5) + + err = storage.ReclaimSpace() + assert.NoError(t, err) + + usedSize2, err := storage.UsedSize() + assert.NoError(t, err) + size6, err := storage.Size() + assert.NoError(t, err) + + assert.Equal(t, size1, size6) + assert.Equal(t, usedSize2, size6) + +} + +func TestDBPruning(t *testing.T) { + storageCapacity := uint64(100_000) + + zeroNodeId := uint256.NewInt(0).Bytes32() + storage, err := NewPortalStorage(storageCapacity, enode.ID(zeroNodeId), nodeDataDir) + assert.NoError(t, err) + defer clear() + defer storage.Close() + + furthestElement := uint256.NewInt(40) + secondFurthest := uint256.NewInt(30) + thirdFurthest := uint256.NewInt(20) + + numBytes := 10_000 + + pt1 := storage.Put(uint256.NewInt(1).Bytes(), genBytes(numBytes)) + assert.NoError(t, pt1.Err()) + pt2 := storage.Put(thirdFurthest.Bytes(), genBytes(numBytes)) + assert.NoError(t, pt2.Err()) + pt3 := storage.Put(uint256.NewInt(3).Bytes(), genBytes(numBytes)) + assert.NoError(t, pt3.Err()) + pt4 := storage.Put(uint256.NewInt(10).Bytes(), genBytes(numBytes)) + assert.NoError(t, pt4.Err()) + pt5 := storage.Put(uint256.NewInt(5).Bytes(), genBytes(numBytes)) + assert.NoError(t, pt5.Err()) + pt6 := storage.Put(uint256.NewInt(11).Bytes(), genBytes(numBytes)) + assert.NoError(t, pt6.Err()) + pt7 := storage.Put(furthestElement.Bytes(), genBytes(2000)) + assert.NoError(t, pt7.Err()) + pt8 := storage.Put(secondFurthest.Bytes(), genBytes(2000)) + assert.NoError(t, pt8.Err()) + pt9 := storage.Put(uint256.NewInt(2).Bytes(), genBytes(numBytes)) + assert.NoError(t, pt9.Err()) + pt10 := storage.Put(uint256.NewInt(4).Bytes(), genBytes(12000)) + assert.NoError(t, pt10.Err()) + + assert.False(t, pt1.Pruned()) + assert.False(t, pt2.Pruned()) + assert.False(t, pt3.Pruned()) + assert.False(t, pt4.Pruned()) + assert.False(t, pt5.Pruned()) + assert.False(t, pt6.Pruned()) + assert.False(t, pt7.Pruned()) + assert.False(t, pt8.Pruned()) + assert.False(t, pt9.Pruned()) + assert.True(t, pt10.Pruned()) + + assert.Equal(t, pt10.PrunedCount(), 2) + usedSize, err := storage.UsedSize() + assert.NoError(t, err) + assert.True(t, usedSize < storage.storageCapacityInBytes) + + _, err = storage.Get(furthestElement.Bytes(), storage.ContentId(furthestElement.Bytes())) + assert.Equal(t, ContentNotFound, err) + + _, err = storage.Get(secondFurthest.Bytes(), storage.ContentId(secondFurthest.Bytes())) + assert.Equal(t, ContentNotFound, err) + + val, err := storage.Get(thirdFurthest.Bytes(), storage.ContentId(thirdFurthest.Bytes())) + assert.NoError(t, err) + assert.NotNil(t, val) + +} + +// func TestForcePruning(t *testing.T) { +// const startCap = uint64(14_159_872) +// const endCapacity = uint64(500_000) +// const amountOfItems = 10_000 + +// nodeId := uint256.MustFromHex("0x30994892f3e4889d99deb5340050510d1842778acc7a7948adffa475fed51d6e").Bytes() + +// storage, err := NewPortalStorage(startCap, enode.ID(nodeId), nodeDataDir) +// assert.NoError(t, err) + +// high := uint256. + +// increment := uint256. + +// } diff --git a/p2p/discover/shisui.sqlite b/p2p/discover/shisui.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..309e8fde73495087333e3f49e80c51a7f7dda055 GIT binary patch literal 102400 zcmeI53s6+&8HUd;EbM}CkPrT4Qa{0$T1!gGdk|pfzYH2n4}2rB++X zS_*9~p{t=#jFm#PrChwUTGWb{^@2eWqlpNJh-easL4};N(J{_+hMCU9shsBwdv=#S z!0_$<|NDOL_kRDsAS`fsTB3ewa_Y*sH2qY;Nl>YT=kU&om?Aa|^#i<41c(3;AOb{y2oM1xKm>>Y5g-CYU|2-p1(ogS zF=JFYR%vnZD-x43R;4AUKDblQ4fpkl@YP57%n9_>-+xX2teq|?akYL1oN9=dt&HSFHE_ z2Ub!nwQ@W)@=ffJAG(Tn#6Izsc>VLI^0zp?tO%G5YE58{$!qNf_t&q!IJM2|jo^hY z4U?-&>-=t+BDy>g?=CI5>(iIA{Rz2iheO>aqN<~_?HqAmoEG9@|TlVtb8T;Cn>3`($X_t&0L+8 zz2>!_zW%eEpRfJJy7j;O)rQkk|}^q<2GM~)slZa#7H)ak}E z?>3!1*L=R^!h07lz5l`G)+=r89alTAbzSei(R1_GhaY|1+xN-sPy6rOy*Dt(|Iho+ z=b!P<{Kx$NMf{U+*~V;8Suix9*Lm_Z--v_HU7WPrc)F(b)DpWU@A`FRXOCT;_3Ap` zA}|CD0YktLFa!+wB8Dg^hmzoOlukN5B`5t(HqyynF6G;)?c}p#o*=Fi3mgZ;3u299 zvtyNGo;XkRkY3^-0z`la5CI}U1c(3;AOb{y2oM1x@MT26(<)HaHqjJn4E6Az7G$)o z$UoX-lY2LPFuvese6guC=kS&kJNcrP{Z$RZ;FXP$(J^I*W>~An^~LNMnC&?~AmdtL zLS*xLoqW-z@Tl17-6xGi^5P?9wb6CPkg-dh4~W~tbMqT)<%>E__!Vakoaw)3zj)@L%RF`Mg4>n8MIj?x zDjK~5*3@scv0Rj0RMgpO@36PJ*Qq3-XLCnRsC#x&LFI-njcHNM+!l>|k!qK}>tXxk z&XGIv-`*GISsi@CyeU|>Z(&bfq2~ONh$L(IB5g>wwl4FXpx0bqPV?KAv{2acQc|4z z&iIkli;5}+CmghrFB+F^Tb zrNm2$mliLvBE8hIh=ZRTKRp@{{S@?*&`(1@5&cy3lhIE{KOy~;^pnz0OF!}O?x&X0 ze_L`tazAoEazAoEazAoEazAoEazAoEazApv;hp=jmKbZPQUB*Nfd2#k2O1z6AQ~VV zAQ~VVAQ~VVAQ~VVAR6HC*8rvbUkCSx`@{X={&0V|KinVg5BG=r!~KUp_ox0#?T^}@ zvTJ|T|Cs}r1C%`n@c#4uqXDAJ19p%E0JSPp(C(2=9FHvm zW`o+Yckxi0IIwpydl$2JF?$!YcQJbxD|GMT|GH<71k^TWgUW)TL+){;0z<$MFa!(% zL%E#-gY519?t7Va|Cc97)%a5uOc+zsvqcZ0h<7k;U^HMfU^HMfU^HN5XuwkXFT(xc zesDjyAKVY_2ls>f!TprM{iy#_|3~{r`zQZXZvIF89}EBkzyL5nxfsCsXZ$n%8UNG( zr~yy|pawtAz>cc;=wXJaz4Y+m*gWAtPKW z8odM7)Nf_&BGxWq?IPALV(lW%ts z-LsPlDmQd#Op9vfwt%5vC>RQcf}vn282Si?I!c+ZGul7eKiWUqKiWUqKia=?w152H zc>nPJ;QfLB!~fy`%H;oYTyo!))Y77aasHEjQ0NVy85U?12A_JMt1|H+4aDdm6qNu?|&Pn~W&^KR4GbIs>lF1&Z~()%A=ZoSgh z-f^|_TG#dN8$CB~efZJGy?vkD{+y(9ecY(XWU7mdI0{+8);D7L6 z_&@m%`QQJR|H>tQ{Ga$g(SY!O;{U||iT@M-C;m_6_&-?#f;AxE|L}kKzcTs1?EhD3 z_XyG#2N56wM1Tko0U|&I{_i7THs~zt|F`U~Y7hpmY>bSKDLXX7S~adOX2-y6&-nov z*9sFNo7dy}!}o{p58ofYKYV}q{vP`NB!G93ZqMMCr(b}63Hn9om!V&XekuCJ=$E5k zkbX(}MbXXB&Ctz;w{9kj{PfJTW&j!x8W0)~8W0)~8W0)~8W0)~8W0)~8W0-L@YaAV z{(s@RAb$<_Ck literal 0 HcmV?d00001 From 94b244539707b90d7071dd185b4d3e73b8d453b1 Mon Sep 17 00:00:00 2001 From: fearlessfe <505380967@qq.com> Date: Fri, 8 Dec 2023 18:27:57 +0800 Subject: [PATCH 07/10] feat: db test --- p2p/discover/portal_storage.go | 103 +++++++++++++++++++-------- p2p/discover/portal_storage_test.go | 104 ++++++++++++++++++++++------ p2p/discover/shisui.sqlite | Bin 102400 -> 94208 bytes 3 files changed, 157 insertions(+), 50 deletions(-) diff --git a/p2p/discover/portal_storage.go b/p2p/discover/portal_storage.go index d4945cd1f229..4fbb947299b5 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/portal_storage.go @@ -7,11 +7,12 @@ import ( "fmt" "math/big" "path" + "strings" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/holiman/uint256" - _ "github.com/mattn/go-sqlite3" + "github.com/mattn/go-sqlite3" ) var ( @@ -46,9 +47,33 @@ type PortalStorage struct { log log.Logger } +func xor(contentId, nodeId []byte) []byte { + // length of contentId maybe not 32bytes + padding := make([]byte, 32) + if len(contentId) != len(nodeId) { + copy(padding, contentId) + } else { + padding = contentId + } + res := make([]byte, len(padding)) + for i, _ := range padding { + res[i] = padding[i] ^ nodeId[i] + } + return res +} + func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDir string) (*PortalStorage, error) { - sqlDb, err := sql.Open("sqlite3", path.Join(nodeDataDir, sqliteName)) + sql.Register("sqlite3_custom", &sqlite3.SQLiteDriver{ + ConnectHook: func(conn *sqlite3.SQLiteConn) error { + if err := conn.RegisterFunc("xor", xor, false); err != nil { + return err + } + return nil + }, + }) + + sqlDb, err := sql.Open("sqlite3_custom", path.Join(nodeDataDir, sqliteName)) if err != nil { return nil, err @@ -87,7 +112,6 @@ func (p *PortalStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) } return res, err } - type PutResult struct { err error pruned bool @@ -113,8 +137,10 @@ func newPutResultWithErr(err error) PutResult { } func (p *PortalStorage) Put(contentKey []byte, content []byte) PutResult { - contentId := p.ContentId(contentKey) + return p.put(p.ContentId(contentKey), content) +} +func (p *PortalStorage) put(contentId []byte, content []byte) PutResult { _, err := p.putStmt.Exec(contentId, content) if err != nil { return newPutResultWithErr(err) @@ -232,15 +258,20 @@ func (p *PortalStorage) GetLargestDistance() (*uint256.Int, error) { if err != nil { return nil, err } - var contentId []byte + var distance []byte - err = stmt.QueryRow(p.nodeId).Scan(&contentId) + err = stmt.QueryRow(p.nodeId[:]).Scan(&distance) if err != nil { return nil, err } - bugNum := new(big.Int).SetBytes(contentId) - res, _ := uint256.FromBig(bugNum) - return res, err + // reverse the distance, because big.SetBytes is big-endian + reverseBytes(distance) + // for i:= 0; i < len(distance) / 2; i++ { + // distance[i], distance[len(distance) - i - 1] = distance[len(distance) - i - 1], distance[i] + // } + bigNum := new(big.Int).SetBytes(distance) + res := uint256.MustFromBig(bigNum) + return res, nil } func (p *PortalStorage) EstimateNewRadius(currentRadius *uint256.Int) (*uint256.Int, error) { @@ -273,6 +304,7 @@ func (p *PortalStorage) deleteContentFraction(fraction float64) (deleteCount int return deleteCount, err } defer rows.Close() + idsToDelete := make([][]byte, 0) for deleteBytes < int(bytesToDelete) && rows.Next() { var contentId []byte var payloadLen int @@ -281,14 +313,18 @@ func (p *PortalStorage) deleteContentFraction(fraction float64) (deleteCount int if err != nil { return deleteCount, err } - err = p.del(contentId) + idsToDelete = append(idsToDelete, contentId) + // err = p.del(contentId) if err != nil { return deleteCount, err } deleteBytes += payloadLen deleteCount++ } - + // row must close first, or database is locked + // rows.Close() can call multi times + rows.Close() + err = p.batchDel(idsToDelete) return } @@ -297,18 +333,38 @@ func (p *PortalStorage) del(contentId []byte) error { return err } +func (p *PortalStorage) batchDel(ids [][]byte) error { + query := "DELETE FROM kvstore WHERE key IN (?" + strings.Repeat(", ?", len(ids)-1) + ")" + args := make([]interface{}, len(ids)) + for i, id := range ids { + args[i] = id + } + + // 执行删除操作 + _, err := p.sqliteDB.Exec(query, args...) + return err +} + func (p *PortalStorage) ReclaimSpace() error { _, err := p.sqliteDB.Exec("VACUUM;") return err } -func (p *PortalStorage) DeleteContentOutOfRadius(radius *uint256.Int) error { - _, err := p.sqliteDB.Exec(deleteOutOfRadiusStmt, p.nodeId, radius) +func (p *PortalStorage) deleteContentOutOfRadius(radius *uint256.Int) error { + res, err := p.sqliteDB.Exec(deleteOutOfRadiusStmt, p.nodeId[:], radius) + count, _ := res.RowsAffected() + p.log.Trace("delete %d items",count) return err } -func (p *PortalStorage) ForcePrune(radius *uint256.Int) { - p.DeleteContentOutOfRadius(radius) +func (p *PortalStorage) ForcePrune(radius *uint256.Int) error { + return p.deleteContentOutOfRadius(radius) +} + +func reverseBytes(src []byte) { + for i:= 0; i < len(src) / 2; i++ { + src[i], src[len(src) - i - 1] = src[len(src) - i - 1], src[i] + } } // SQLite Statements @@ -323,20 +379,11 @@ const deleteSql = "DELETE FROM kvstore WHERE key = (?1);" // const clearSql = "DELETE FROM kvstore" const containSql = "SELECT 1 FROM kvstore WHERE key = (?1);" -const getAllOrderedByDistanceSql = "SELECT key, length(value), ((?1 | key) - (?1 & key)) as distance FROM kvstore ORDER BY distance DESC;" -const deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE ((?1 | key) - (?1 & key)) < ?2" +const getAllOrderedByDistanceSql = "SELECT key, length(value), xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC;" +const deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE xor(key, (?1)) < ?2" const XOR_FIND_FARTHEST_QUERY = `SELECT - ((?1 | key) - (?1 & key)) as distance + xor(key, (?1)) as distance FROM kvstore - ORDER BY distance DESC LIMIT 1` - -const CONTENT_KEY_LOOKUP_QUERY = "SELECT content_key FROM content_metadata WHERE content_id_long = (?1)" - -const TOTAL_DATA_SIZE_QUERY = "SELECT TOTAL(content_size) FROM content_metadata" - -const TOTAL_ENTRY_COUNT_QUERY = "SELECT COUNT(content_id_long) FROM content_metadata" - -const PAGINATE_QUERY = "SELECT content_key FROM content_metadata ORDER BY content_key LIMIT :limit OFFSET :offset" + ORDER BY distance DESC` -const CONTENT_SIZE_LOOKUP_QUERY = "SELECT content_size FROM content_metadata WHERE content_id_long = (?1)" diff --git a/p2p/discover/portal_storage_test.go b/p2p/discover/portal_storage_test.go index 37e4b1262a2b..99ee0411f7b4 100644 --- a/p2p/discover/portal_storage_test.go +++ b/p2p/discover/portal_storage_test.go @@ -135,26 +135,30 @@ func TestDBPruning(t *testing.T) { thirdFurthest := uint256.NewInt(20) numBytes := 10_000 - - pt1 := storage.Put(uint256.NewInt(1).Bytes(), genBytes(numBytes)) + // test with private put method + pt1 := storage.put(uint256.NewInt(1).Bytes(), genBytes(numBytes)) assert.NoError(t, pt1.Err()) - pt2 := storage.Put(thirdFurthest.Bytes(), genBytes(numBytes)) + pt2 := storage.put(thirdFurthest.Bytes(), genBytes(numBytes)) assert.NoError(t, pt2.Err()) - pt3 := storage.Put(uint256.NewInt(3).Bytes(), genBytes(numBytes)) + pt3 := storage.put(uint256.NewInt(3).Bytes(), genBytes(numBytes)) assert.NoError(t, pt3.Err()) - pt4 := storage.Put(uint256.NewInt(10).Bytes(), genBytes(numBytes)) + pt4 := storage.put(uint256.NewInt(10).Bytes(), genBytes(numBytes)) assert.NoError(t, pt4.Err()) - pt5 := storage.Put(uint256.NewInt(5).Bytes(), genBytes(numBytes)) + pt5 := storage.put(uint256.NewInt(5).Bytes(), genBytes(numBytes)) assert.NoError(t, pt5.Err()) - pt6 := storage.Put(uint256.NewInt(11).Bytes(), genBytes(numBytes)) + pt6 := storage.put(uint256.NewInt(11).Bytes(), genBytes(numBytes)) assert.NoError(t, pt6.Err()) - pt7 := storage.Put(furthestElement.Bytes(), genBytes(2000)) + pt7 := storage.put(furthestElement.Bytes(), genBytes(4000)) assert.NoError(t, pt7.Err()) - pt8 := storage.Put(secondFurthest.Bytes(), genBytes(2000)) + pt8 := storage.put(secondFurthest.Bytes(), genBytes(3000)) assert.NoError(t, pt8.Err()) - pt9 := storage.Put(uint256.NewInt(2).Bytes(), genBytes(numBytes)) + pt9 := storage.put(uint256.NewInt(2).Bytes(), genBytes(numBytes)) assert.NoError(t, pt9.Err()) - pt10 := storage.Put(uint256.NewInt(4).Bytes(), genBytes(12000)) + + res, _ := storage.GetLargestDistance() + + assert.Equal(t, res, uint256.NewInt(40)) + pt10 := storage.put(uint256.NewInt(4).Bytes(), genBytes(12000)) assert.NoError(t, pt10.Err()) assert.False(t, pt1.Pruned()) @@ -179,24 +183,80 @@ func TestDBPruning(t *testing.T) { _, err = storage.Get(secondFurthest.Bytes(), storage.ContentId(secondFurthest.Bytes())) assert.Equal(t, ContentNotFound, err) - val, err := storage.Get(thirdFurthest.Bytes(), storage.ContentId(thirdFurthest.Bytes())) + val, err := storage.Get(thirdFurthest.Bytes(), thirdFurthest.Bytes()) assert.NoError(t, err) assert.NotNil(t, val) } -// func TestForcePruning(t *testing.T) { -// const startCap = uint64(14_159_872) -// const endCapacity = uint64(500_000) -// const amountOfItems = 10_000 +func TestGetLargestDistance(t *testing.T) { + storageCapacity := uint64(100_000) + + zeroNodeId := uint256.NewInt(0).Bytes32() + storage, err := NewPortalStorage(storageCapacity, enode.ID(zeroNodeId), nodeDataDir) + assert.NoError(t, err) + defer clear() + defer storage.Close() + + furthestElement := uint256.NewInt(40) + secondFurthest := uint256.NewInt(30) + + pt7 := storage.put(furthestElement.Bytes(), genBytes(2000)) + assert.NoError(t, pt7.Err()) + + val, err := storage.Get(furthestElement.Bytes(), furthestElement.Bytes()) + assert.NoError(t, err) + assert.NotNil(t, val) + pt8 := storage.put(secondFurthest.Bytes(), genBytes(2000)) + assert.NoError(t, pt8.Err()) + res, err := storage.GetLargestDistance() + assert.NoError(t, err) + assert.Equal(t, furthestElement, res) +} + +func TestForcePruning(t *testing.T) { + const startCap = uint64(14_159_872) // 100KB + const endCapacity = uint64(500_000) // 40KB + const amountOfItems = 10_000 + + maxUint256:= uint256.MustFromHex("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + + nodeId := uint256.MustFromHex("0x30994892f3e4889d99deb5340050510d1842778acc7a7948adffa475fed51d6e").Bytes() + content := genBytes(1000) + + storage, err := NewPortalStorage(startCap, enode.ID(nodeId), nodeDataDir) + assert.NoError(t, err) + + increment := uint256.NewInt(0).Div(maxUint256, uint256.NewInt(amountOfItems)) + remainder := uint256.NewInt(0).Mod(maxUint256, uint256.NewInt(amountOfItems)) + + id := uint256.NewInt(0) + putCount := 0 + // id < maxUint256 - remainder + for id.Cmp(uint256.NewInt(0).Sub(maxUint256, remainder)) == -1 { + res := storage.put(id.Bytes(), content) + assert.NoError(t, res.Err()) + id = id.Add(id, increment) + putCount++ + } -// nodeId := uint256.MustFromHex("0x30994892f3e4889d99deb5340050510d1842778acc7a7948adffa475fed51d6e").Bytes() + storage.storageCapacityInBytes = endCapacity -// storage, err := NewPortalStorage(startCap, enode.ID(nodeId), nodeDataDir) -// assert.NoError(t, err) + oldDistance, err := storage.GetLargestDistance() + assert.NoError(t, err) + newDistance, err := storage.EstimateNewRadius(oldDistance) + assert.NoError(t, err) + assert.NotEqual(t, oldDistance.Cmp(newDistance), -1) + err = storage.ForcePrune(newDistance) + assert.NoError(t, err) -// high := uint256. + err = storage.ReclaimSpace() + assert.NoError(t, err) + size, err := storage.Size() + assert.NoError(t, err) -// increment := uint256. + diff := math.Abs(float64(size - storage.storageCapacityInBytes)) + // uint64(float64(storage.storageCapacityInBytes) * 0.2) + assert.True(t, diff < float64(storage.storageCapacityInBytes) * 0.3) -// } +} diff --git a/p2p/discover/shisui.sqlite b/p2p/discover/shisui.sqlite index 309e8fde73495087333e3f49e80c51a7f7dda055..26b1125b3c933f678d7f861bb17e8824a1e1050b 100644 GIT binary patch literal 94208 zcmeI5e_T~(9mg;C0{7Q*@8PE)AYKF&6%f3$8A$?$C|+_Xp)8WnVHixV@*~D_W=(xI zGx}jGSC)oZ=H|>FMVe8xklQwzrz7I00-az9DoCG01m(bH~e#12>+ZvfrlpJPRx$3_9j+WI;dR1Rr*VfuR{B!Nd z@2brkd12E%b&#(S`d6&BlU^ohZ)-QStJ>?@pmsrfS$k1Cqn*Pbk#(p_EcQj5W2C1S#9AFk#>jE|(0vA(o%=^5iEOw61#c}iAx&eYtzY5u#Wf8g#JAI$&I%z_WkDl96V{gIM6rE}+% z%@34+biq9p3l~*>tZMO+>U%$4^NFRkpR8N9{J#1XpK4h7>Bgq}S2YJ$uW4y*YyV8g z1Dy|kcI`v!KDYk!UwC-K#!d2(M>l_Q%a^*o{FSeMZR^(``^Go7J^rn4Z~xABckKLL z_pa~n{=pCT{AlkJPwsna|BnwGJaqUcKkfP1k)uC9_KV(M{;KcSPe1dUUouy)mCUhfOtU^rkns4yH(avcHv@KeLHd{;ezU6(v`;_-lZ@stBo9wya zdCv2MXOm}{r@)gau8C7(k9b(rihL38zT!UN-tAuRu5sV(j&Z%=de+tLddOAny33_G zUvoa=+~IuCS>?=g3df+M&#~Rn;aKFzaX9U-*n91d>;Ew}H~@BYU6!6FqK3ttWEy(91`*S9o(8tqE*bNtM7V57Ks{}^S%0q-#&#z4cuvj%u{_q$K*}RlUq8Ys9WeFsj&JWI`qe#S$ zO@CHAiE?>}ZI2^S4nH`(u_T(p56*QlBx29e)?{9ZCeZ|baMneUD4idi&ub(~@E`VhR1a{Nfgap|LUoD)j=YSADs8v zNhH{@dGbUgiTM3?&5R(Cor~##(kH`76k%YwjYL*{aCTX3HsSQ>e{o@WTwh_rMIl|MUGVQbZ6Aj4nO^2fzVvpaKrU2?s`( z8G!@f05~u|4p9FqP!XbwDmfs9lM>!&1{4w=36F#~f5J=QARHLYbp#H81K>af9M}m5 zMzflM1Kv7%U)m2si)^fCKa60QJ90r6KyNfCEy*P{JE6CyRtf!Xx3$pYT#d6At+0mou*b z2fzVvpaKq}2nR+>!~+Mw0dQb`9H9PJpdv&URdPUzcuIJqr`I6ik?=@(^C!F%afAb- z=Trd)zyWZe0uEvc2S!h%0}g-#;K2MiK>e>$X^6fm;D8i~l<-Cm%SFN?;gRs>Pk1R3 z2nR-w#Rd+51K>af9E>6yOz&f-1RVtqfCJ#b{5U}UuRukJE~?~!6v>qE#caU$ZA8K& z;gRs>Pk1Sk2nVx<*zG2802}}ZD&Sx=;h-SceU5n~XajHn8~_LA#{uepl}bbORRIU2 zNTq}?<0IaWgh#?7;mx1$Qj8%SlyM3I4I#40E`C?fCJ#b{5U}U zuRukJE~?~!6ls+36_++}8GjxLkAz3Un?K>D@DUCc@GAgT00+PUaG(MX#u5$!C)uqJ zH~z{{0T2bI^ke3Gut=VefBnR02}}Z zD&Sxo;h?f}fSVE23>*Llz=8R3fcjs7iV$5?$pI-QQo`49`QAB@kAz3UBjL@T@KQ`5 z94zf&Cj{*U4uAvTKm{C(Cmiszfof8K1Kxx32**{mm-UBu!>#&Z(js(02}}ZD&SxW;h>RS9OzOzZ~zVK6=L-bVv2c*cQgzsSI`}Pze;gRr2c=IQ`6jKQYZR~8I>j!`X-~c#K0S7sRgBCx# zr2_}R0dQb`9H9PJpdv&URdPTIKPCJ+_Ick!myz&DcqF{}6JCmGgoCyGe4tw305|{+ zRKP(V;ecQMuk$o;02}}Z=Eni*f0ar@^i=@|q$s6?Kggco_a`CYk?=@(^C!F%a|j3f zyV*N_HvtDneKh^2>ix4@fzishOx1yN}tQO05?OL%V=tkvcDUlbyu&G2yR((7j)*_5}s&O+#uHU;Vv z;{+1I96&;V4M=da00}N8E)dDr&%mF{KZ)-j{~iAAd^h>F@b&Y1@GEaDEat0MVfJBs zsyip3CqUWV#J7ii?Zng9*t-6d{!g0pA!)&!#f>NC6>@c5Bw(~jUD>s)8~^N6gyZ0KEcco`d5QT41@FCTOB?%noPU{T8Zxz8H|WUI3$ zt?PWvIwxwo^KDj;BF0_rk|((HUkYxRxcoq%$~M1u=cf5_9*B6~KZCXY)~S$e7LY0q z|F;}FE7tqgN@kU~t;mjGn3tTLD7!I9a9h-@wf{AbGJ_Nmic2PCW*<&M5eJMCRtZTd zX&G5L`OTgZCqR)8N0VfAKrzR#Sx@8&LKK`lG(ZyUn;j)Cph@7@!AX)19$ug&OpFx_ z9D5imHWud4GaVZU0PSYM;s6$KhQJq(plC#72yjN?IcLfY)<-C2k@SJG4+!&;6lS1! z;U`H4woqmRJC;zgM{+Evv=BUJ$_mj(ASWQ{19^}Sh=oXUvL!fP7=#!&IDrHh^p|k3 z_{`TKl<5&xzfkWlVFzhJE&{NX07zOufh$UqOSwQUW#rfcOu`!rH*o+9Voq=n5h}qD zmKuP9NCb!_77I$EWwt+%?D Date: Fri, 8 Dec 2023 20:56:51 +0800 Subject: [PATCH 08/10] feat: impl portal storage --- p2p/discover/portal_storage.go | 25 ++++++++---- p2p/discover/portal_storage_test.go | 57 ++++++++++++++++++++++------ p2p/discover/shisui.sqlite | Bin 94208 -> 0 bytes 3 files changed, 63 insertions(+), 19 deletions(-) delete mode 100644 p2p/discover/shisui.sqlite diff --git a/p2p/discover/portal_storage.go b/p2p/discover/portal_storage.go index 4fbb947299b5..77404a556b3b 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/portal_storage.go @@ -1,6 +1,7 @@ package discover import ( + "bytes" "crypto/sha256" "database/sql" "errors" @@ -49,7 +50,7 @@ type PortalStorage struct { func xor(contentId, nodeId []byte) []byte { // length of contentId maybe not 32bytes - padding := make([]byte, 32) + padding := make([]byte, 32) if len(contentId) != len(nodeId) { copy(padding, contentId) } else { @@ -62,6 +63,11 @@ func xor(contentId, nodeId []byte) []byte { return res } +// a > b return 1; else return 0 +func greater(a, b []byte) int { + return bytes.Compare(a, b) +} + func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDir string) (*PortalStorage, error) { sql.Register("sqlite3_custom", &sqlite3.SQLiteDriver{ @@ -69,6 +75,9 @@ func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDi if err := conn.RegisterFunc("xor", xor, false); err != nil { return err } + if err := conn.RegisterFunc("greater", greater, false); err != nil { + return err + } return nil }, }) @@ -112,6 +121,7 @@ func (p *PortalStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) } return res, err } + type PutResult struct { err error pruned bool @@ -351,9 +361,9 @@ func (p *PortalStorage) ReclaimSpace() error { } func (p *PortalStorage) deleteContentOutOfRadius(radius *uint256.Int) error { - res, err := p.sqliteDB.Exec(deleteOutOfRadiusStmt, p.nodeId[:], radius) - count, _ := res.RowsAffected() - p.log.Trace("delete %d items",count) + res, err := p.sqliteDB.Exec(deleteOutOfRadiusStmt, p.nodeId[:], radius.Bytes()) + count, _ := res.RowsAffected() + p.log.Trace("delete %d items", count) return err } @@ -362,8 +372,8 @@ func (p *PortalStorage) ForcePrune(radius *uint256.Int) error { } func reverseBytes(src []byte) { - for i:= 0; i < len(src) / 2; i++ { - src[i], src[len(src) - i - 1] = src[len(src) - i - 1], src[i] + for i := 0; i < len(src)/2; i++ { + src[i], src[len(src)-i-1] = src[len(src)-i-1], src[i] } } @@ -380,10 +390,9 @@ const deleteSql = "DELETE FROM kvstore WHERE key = (?1);" // const clearSql = "DELETE FROM kvstore" const containSql = "SELECT 1 FROM kvstore WHERE key = (?1);" const getAllOrderedByDistanceSql = "SELECT key, length(value), xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC;" -const deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE xor(key, (?1)) < ?2" +const deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE greater(xor(key, (?1)), (?2)) = 1" const XOR_FIND_FARTHEST_QUERY = `SELECT xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC` - diff --git a/p2p/discover/portal_storage_test.go b/p2p/discover/portal_storage_test.go index 99ee0411f7b4..66cb76044bc0 100644 --- a/p2p/discover/portal_storage_test.go +++ b/p2p/discover/portal_storage_test.go @@ -214,18 +214,58 @@ func TestGetLargestDistance(t *testing.T) { assert.Equal(t, furthestElement, res) } +func TestSimpleForcePruning(t *testing.T) { + storageCapacity := uint64(100_000) + + zeroNodeId := uint256.NewInt(0).Bytes32() + storage, err := NewPortalStorage(storageCapacity, enode.ID(zeroNodeId), nodeDataDir) + assert.NoError(t, err) + defer clear() + defer storage.Close() + + furthestElement := uint256.NewInt(40) + secondFurthest := uint256.NewInt(30) + third := uint256.NewInt(10) + + pt1 := storage.put(furthestElement.Bytes(), genBytes(2000)) + assert.NoError(t, pt1.Err()) + + pt2 := storage.put(secondFurthest.Bytes(), genBytes(2000)) + assert.NoError(t, pt2.Err()) + + pt3 := storage.put(third.Bytes(), genBytes(2000)) + assert.NoError(t, pt3.Err()) + res, err := storage.GetLargestDistance() + assert.NoError(t, err) + assert.Equal(t, furthestElement, res) + + err = storage.ForcePrune(uint256.NewInt(20)) + assert.NoError(t, err) + + _, err = storage.Get(furthestElement.Bytes(), furthestElement.Bytes()) + assert.Equal(t, ContentNotFound, err) + + _, err = storage.Get(secondFurthest.Bytes(), secondFurthest.Bytes()) + assert.Equal(t, ContentNotFound, err) + + _, err = storage.Get(third.Bytes(), third.Bytes()) + assert.NoError(t, err) +} + func TestForcePruning(t *testing.T) { - const startCap = uint64(14_159_872) // 100KB - const endCapacity = uint64(500_000) // 40KB + const startCap = uint64(14_159_872) + const endCapacity = uint64(5000_000) const amountOfItems = 10_000 - maxUint256:= uint256.MustFromHex("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + maxUint256 := uint256.MustFromHex("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") nodeId := uint256.MustFromHex("0x30994892f3e4889d99deb5340050510d1842778acc7a7948adffa475fed51d6e").Bytes() content := genBytes(1000) storage, err := NewPortalStorage(startCap, enode.ID(nodeId), nodeDataDir) assert.NoError(t, err) + defer clear() + defer storage.Close() increment := uint256.NewInt(0).Div(maxUint256, uint256.NewInt(amountOfItems)) remainder := uint256.NewInt(0).Mod(maxUint256, uint256.NewInt(amountOfItems)) @@ -250,13 +290,8 @@ func TestForcePruning(t *testing.T) { err = storage.ForcePrune(newDistance) assert.NoError(t, err) - err = storage.ReclaimSpace() + var total int64 + err = storage.sqliteDB.QueryRow("SELECT count(*) FROM kvstore where greater(xor(key, (?1)), (?2)) = 1", storage.nodeId[:], newDistance.Bytes()).Scan(&total) assert.NoError(t, err) - size, err := storage.Size() - assert.NoError(t, err) - - diff := math.Abs(float64(size - storage.storageCapacityInBytes)) - // uint64(float64(storage.storageCapacityInBytes) * 0.2) - assert.True(t, diff < float64(storage.storageCapacityInBytes) * 0.3) - + assert.Equal(t, int64(0), total) } diff --git a/p2p/discover/shisui.sqlite b/p2p/discover/shisui.sqlite deleted file mode 100644 index 26b1125b3c933f678d7f861bb17e8824a1e1050b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 94208 zcmeI5e_T~(9mg;C0{7Q*@8PE)AYKF&6%f3$8A$?$C|+_Xp)8WnVHixV@*~D_W=(xI zGx}jGSC)oZ=H|>FMVe8xklQwzrz7I00-az9DoCG01m(bH~e#12>+ZvfrlpJPRx$3_9j+WI;dR1Rr*VfuR{B!Nd z@2brkd12E%b&#(S`d6&BlU^ohZ)-QStJ>?@pmsrfS$k1Cqn*Pbk#(p_EcQj5W2C1S#9AFk#>jE|(0vA(o%=^5iEOw61#c}iAx&eYtzY5u#Wf8g#JAI$&I%z_WkDl96V{gIM6rE}+% z%@34+biq9p3l~*>tZMO+>U%$4^NFRkpR8N9{J#1XpK4h7>Bgq}S2YJ$uW4y*YyV8g z1Dy|kcI`v!KDYk!UwC-K#!d2(M>l_Q%a^*o{FSeMZR^(``^Go7J^rn4Z~xABckKLL z_pa~n{=pCT{AlkJPwsna|BnwGJaqUcKkfP1k)uC9_KV(M{;KcSPe1dUUouy)mCUhfOtU^rkns4yH(avcHv@KeLHd{;ezU6(v`;_-lZ@stBo9wya zdCv2MXOm}{r@)gau8C7(k9b(rihL38zT!UN-tAuRu5sV(j&Z%=de+tLddOAny33_G zUvoa=+~IuCS>?=g3df+M&#~Rn;aKFzaX9U-*n91d>;Ew}H~@BYU6!6FqK3ttWEy(91`*S9o(8tqE*bNtM7V57Ks{}^S%0q-#&#z4cuvj%u{_q$K*}RlUq8Ys9WeFsj&JWI`qe#S$ zO@CHAiE?>}ZI2^S4nH`(u_T(p56*QlBx29e)?{9ZCeZ|baMneUD4idi&ub(~@E`VhR1a{Nfgap|LUoD)j=YSADs8v zNhH{@dGbUgiTM3?&5R(Cor~##(kH`76k%YwjYL*{aCTX3HsSQ>e{o@WTwh_rMIl|MUGVQbZ6Aj4nO^2fzVvpaKrU2?s`( z8G!@f05~u|4p9FqP!XbwDmfs9lM>!&1{4w=36F#~f5J=QARHLYbp#H81K>af9M}m5 zMzflM1Kv7%U)m2si)^fCKa60QJ90r6KyNfCEy*P{JE6CyRtf!Xx3$pYT#d6At+0mou*b z2fzVvpaKq}2nR+>!~+Mw0dQb`9H9PJpdv&URdPUzcuIJqr`I6ik?=@(^C!F%afAb- z=Trd)zyWZe0uEvc2S!h%0}g-#;K2MiK>e>$X^6fm;D8i~l<-Cm%SFN?;gRs>Pk1R3 z2nR-w#Rd+51K>af9E>6yOz&f-1RVtqfCJ#b{5U}UuRukJE~?~!6v>qE#caU$ZA8K& z;gRs>Pk1Sk2nVx<*zG2802}}ZD&Sx=;h-SceU5n~XajHn8~_LA#{uepl}bbORRIU2 zNTq}?<0IaWgh#?7;mx1$Qj8%SlyM3I4I#40E`C?fCJ#b{5U}U zuRukJE~?~!6ls+36_++}8GjxLkAz3Un?K>D@DUCc@GAgT00+PUaG(MX#u5$!C)uqJ zH~z{{0T2bI^ke3Gut=VefBnR02}}Z zD&Sxo;h?f}fSVE23>*Llz=8R3fcjs7iV$5?$pI-QQo`49`QAB@kAz3UBjL@T@KQ`5 z94zf&Cj{*U4uAvTKm{C(Cmiszfof8K1Kxx32**{mm-UBu!>#&Z(js(02}}ZD&SxW;h>RS9OzOzZ~zVK6=L-bVv2c*cQgzsSI`}Pze;gRr2c=IQ`6jKQYZR~8I>j!`X-~c#K0S7sRgBCx# zr2_}R0dQb`9H9PJpdv&URdPTIKPCJ+_Ick!myz&DcqF{}6JCmGgoCyGe4tw305|{+ zRKP(V;ecQMuk$o;02}}Z=Eni*f0ar@^i=@|q$s6?Kggco_a`CYk?=@(^C!F%a|j3f zyV*N_Hv Date: Sat, 9 Dec 2023 00:13:58 +0800 Subject: [PATCH 09/10] ci: fix ci problems --- p2p/discover/portal_storage.go | 35 ++++++++++++++++++----------- p2p/discover/portal_storage_test.go | 2 -- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/p2p/discover/portal_storage.go b/p2p/discover/portal_storage.go index 77404a556b3b..bbb3ab71a8d9 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/portal_storage.go @@ -57,7 +57,7 @@ func xor(contentId, nodeId []byte) []byte { padding = contentId } res := make([]byte, len(padding)) - for i, _ := range padding { + for i := range padding { res[i] = padding[i] ^ nodeId[i] } return res @@ -69,18 +69,27 @@ func greater(a, b []byte) int { } func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDir string) (*PortalStorage, error) { - - sql.Register("sqlite3_custom", &sqlite3.SQLiteDriver{ - ConnectHook: func(conn *sqlite3.SQLiteConn) error { - if err := conn.RegisterFunc("xor", xor, false); err != nil { - return err - } - if err := conn.RegisterFunc("greater", greater, false); err != nil { - return err - } - return nil - }, - }) + // avoid repeated register in tests + registered := false + drives := sql.Drivers() + for _, v := range drives { + if v == "sqlite3_custom" { + registered = true + } + } + if !registered { + sql.Register("sqlite3_custom", &sqlite3.SQLiteDriver{ + ConnectHook: func(conn *sqlite3.SQLiteConn) error { + if err := conn.RegisterFunc("xor", xor, false); err != nil { + return err + } + if err := conn.RegisterFunc("greater", greater, false); err != nil { + return err + } + return nil + }, + }) + } sqlDb, err := sql.Open("sqlite3_custom", path.Join(nodeDataDir, sqliteName)) diff --git a/p2p/discover/portal_storage_test.go b/p2p/discover/portal_storage_test.go index 66cb76044bc0..75f436249060 100644 --- a/p2p/discover/portal_storage_test.go +++ b/p2p/discover/portal_storage_test.go @@ -118,7 +118,6 @@ func TestDBSize(t *testing.T) { assert.Equal(t, size1, size6) assert.Equal(t, usedSize2, size6) - } func TestDBPruning(t *testing.T) { @@ -186,7 +185,6 @@ func TestDBPruning(t *testing.T) { val, err := storage.Get(thirdFurthest.Bytes(), thirdFurthest.Bytes()) assert.NoError(t, err) assert.NotNil(t, val) - } func TestGetLargestDistance(t *testing.T) { From 493a17ddf95ae65b4e03771493ab453b53a33f09 Mon Sep 17 00:00:00 2001 From: fearlessfe <505380967@qq.com> Date: Mon, 11 Dec 2023 14:40:37 +0800 Subject: [PATCH 10/10] refactor: content_storage Storage interface --- log/logger_test.go | 1 - .../{portal_storage.go => content_storage.go} | 58 ++++++------------- ...torage_test.go => content_storage_test.go} | 57 +++++++++--------- p2p/discover/portal_protocol.go | 28 ++++++--- p2p/discover/portal_protocol_test.go | 17 +++--- 5 files changed, 77 insertions(+), 84 deletions(-) rename p2p/discover/{portal_storage.go => content_storage.go} (86%) rename p2p/discover/{portal_storage_test.go => content_storage_test.go} (79%) diff --git a/log/logger_test.go b/log/logger_test.go index 8dae62f48faa..a633f5ad7a4c 100644 --- a/log/logger_test.go +++ b/log/logger_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/holiman/uint256" - "golang.org/x/exp/slog" ) diff --git a/p2p/discover/portal_storage.go b/p2p/discover/content_storage.go similarity index 86% rename from p2p/discover/portal_storage.go rename to p2p/discover/content_storage.go index bbb3ab71a8d9..5cd0f7e4f4c9 100644 --- a/p2p/discover/portal_storage.go +++ b/p2p/discover/content_storage.go @@ -2,7 +2,6 @@ package discover import ( "bytes" - "crypto/sha256" "database/sql" "errors" "fmt" @@ -25,14 +24,27 @@ var ( const ( sqliteName = "shisui.sqlite" contentDeletionFraction = 0.05 // 5% of the content will be deleted when the storage capacity is hit and radius gets adjusted. + // SQLite Statements + createSql = `CREATE TABLE IF NOT EXISTS kvstore ( + key BLOB PRIMARY KEY, + value BLOB + );` + getSql = "SELECT value FROM kvstore WHERE key = (?1);" + putSql = "INSERT OR REPLACE INTO kvstore (key, value) VALUES (?1, ?2);" + deleteSql = "DELETE FROM kvstore WHERE key = (?1);" + containSql = "SELECT 1 FROM kvstore WHERE key = (?1);" + getAllOrderedByDistanceSql = "SELECT key, length(value), xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC;" + deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE greater(xor(key, (?1)), (?2)) = 1" + XOR_FIND_FARTHEST_QUERY = `SELECT + xor(key, (?1)) as distance + FROM kvstore + ORDER BY distance DESC` ) type Storage interface { - ContentId(contentKey []byte) []byte - - Get(contentKey []byte, contentId []byte) ([]byte, error) + Get(contentId []byte) ([]byte, error) - Put(contentKey []byte, content []byte) error + Put(contentId []byte, content []byte) error } type PortalStorage struct { @@ -117,12 +129,7 @@ func NewPortalStorage(storageCapacityInBytes uint64, nodeId enode.ID, nodeDataDi return portalStorage, err } -func (p *PortalStorage) ContentId(contentKey []byte) []byte { - digest := sha256.Sum256(contentKey) - return digest[:] -} - -func (p *PortalStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) { +func (p *PortalStorage) Get(contentId []byte) ([]byte, error) { var res []byte err := p.getStmt.QueryRow(contentId).Scan(&res) if err == sql.ErrNoRows { @@ -155,11 +162,7 @@ func newPutResultWithErr(err error) PutResult { } } -func (p *PortalStorage) Put(contentKey []byte, content []byte) PutResult { - return p.put(p.ContentId(contentKey), content) -} - -func (p *PortalStorage) put(contentId []byte, content []byte) PutResult { +func (p *PortalStorage) Put(contentId []byte, content []byte) PutResult { _, err := p.putStmt.Exec(contentId, content) if err != nil { return newPutResultWithErr(err) @@ -285,9 +288,6 @@ func (p *PortalStorage) GetLargestDistance() (*uint256.Int, error) { } // reverse the distance, because big.SetBytes is big-endian reverseBytes(distance) - // for i:= 0; i < len(distance) / 2; i++ { - // distance[i], distance[len(distance) - i - 1] = distance[len(distance) - i - 1], distance[i] - // } bigNum := new(big.Int).SetBytes(distance) res := uint256.MustFromBig(bigNum) return res, nil @@ -385,23 +385,3 @@ func reverseBytes(src []byte) { src[i], src[len(src)-i-1] = src[len(src)-i-1], src[i] } } - -// SQLite Statements - -const createSql = `CREATE TABLE IF NOT EXISTS kvstore ( - key BLOB PRIMARY KEY, - value BLOB -);` -const getSql = "SELECT value FROM kvstore WHERE key = (?1);" -const putSql = "INSERT OR REPLACE INTO kvstore (key, value) VALUES (?1, ?2);" -const deleteSql = "DELETE FROM kvstore WHERE key = (?1);" - -// const clearSql = "DELETE FROM kvstore" -const containSql = "SELECT 1 FROM kvstore WHERE key = (?1);" -const getAllOrderedByDistanceSql = "SELECT key, length(value), xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC;" -const deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE greater(xor(key, (?1)), (?2)) = 1" - -const XOR_FIND_FARTHEST_QUERY = `SELECT - xor(key, (?1)) as distance - FROM kvstore - ORDER BY distance DESC` diff --git a/p2p/discover/portal_storage_test.go b/p2p/discover/content_storage_test.go similarity index 79% rename from p2p/discover/portal_storage_test.go rename to p2p/discover/content_storage_test.go index 75f436249060..7d5442080ab6 100644 --- a/p2p/discover/portal_storage_test.go +++ b/p2p/discover/content_storage_test.go @@ -33,15 +33,16 @@ func TestBasicStorage(t *testing.T) { defer storage.Close() contentKey := []byte("test") + contentId := defaultContentIdFunc(contentKey) content := []byte("value") - _, err = storage.Get(contentKey, storage.ContentId(contentKey)) + _, err = storage.Get(contentId) assert.Equal(t, ContentNotFound, err) - pt := storage.Put(contentKey, content) + pt := storage.Put(contentId, content) assert.NoError(t, pt.Err()) - val, err := storage.Get(contentKey, storage.ContentId(contentKey)) + val, err := storage.Get(contentId) assert.NoError(t, err) assert.Equal(t, content, val) @@ -95,9 +96,9 @@ func TestDBSize(t *testing.T) { assert.True(t, size4 == size3) assert.True(t, usedSize == size4) - err = storage.del(storage.ContentId(uint256.NewInt(2).Bytes())) + err = storage.del(uint256.NewInt(2).Bytes()) assert.NoError(t, err) - err = storage.del(storage.ContentId(uint256.NewInt(1).Bytes())) + err = storage.del(uint256.NewInt(1).Bytes()) assert.NoError(t, err) usedSize1, err := storage.UsedSize() @@ -135,29 +136,29 @@ func TestDBPruning(t *testing.T) { numBytes := 10_000 // test with private put method - pt1 := storage.put(uint256.NewInt(1).Bytes(), genBytes(numBytes)) + pt1 := storage.Put(uint256.NewInt(1).Bytes(), genBytes(numBytes)) assert.NoError(t, pt1.Err()) - pt2 := storage.put(thirdFurthest.Bytes(), genBytes(numBytes)) + pt2 := storage.Put(thirdFurthest.Bytes(), genBytes(numBytes)) assert.NoError(t, pt2.Err()) - pt3 := storage.put(uint256.NewInt(3).Bytes(), genBytes(numBytes)) + pt3 := storage.Put(uint256.NewInt(3).Bytes(), genBytes(numBytes)) assert.NoError(t, pt3.Err()) - pt4 := storage.put(uint256.NewInt(10).Bytes(), genBytes(numBytes)) + pt4 := storage.Put(uint256.NewInt(10).Bytes(), genBytes(numBytes)) assert.NoError(t, pt4.Err()) - pt5 := storage.put(uint256.NewInt(5).Bytes(), genBytes(numBytes)) + pt5 := storage.Put(uint256.NewInt(5).Bytes(), genBytes(numBytes)) assert.NoError(t, pt5.Err()) - pt6 := storage.put(uint256.NewInt(11).Bytes(), genBytes(numBytes)) + pt6 := storage.Put(uint256.NewInt(11).Bytes(), genBytes(numBytes)) assert.NoError(t, pt6.Err()) - pt7 := storage.put(furthestElement.Bytes(), genBytes(4000)) + pt7 := storage.Put(furthestElement.Bytes(), genBytes(4000)) assert.NoError(t, pt7.Err()) - pt8 := storage.put(secondFurthest.Bytes(), genBytes(3000)) + pt8 := storage.Put(secondFurthest.Bytes(), genBytes(3000)) assert.NoError(t, pt8.Err()) - pt9 := storage.put(uint256.NewInt(2).Bytes(), genBytes(numBytes)) + pt9 := storage.Put(uint256.NewInt(2).Bytes(), genBytes(numBytes)) assert.NoError(t, pt9.Err()) res, _ := storage.GetLargestDistance() assert.Equal(t, res, uint256.NewInt(40)) - pt10 := storage.put(uint256.NewInt(4).Bytes(), genBytes(12000)) + pt10 := storage.Put(uint256.NewInt(4).Bytes(), genBytes(12000)) assert.NoError(t, pt10.Err()) assert.False(t, pt1.Pruned()) @@ -176,13 +177,13 @@ func TestDBPruning(t *testing.T) { assert.NoError(t, err) assert.True(t, usedSize < storage.storageCapacityInBytes) - _, err = storage.Get(furthestElement.Bytes(), storage.ContentId(furthestElement.Bytes())) + _, err = storage.Get(furthestElement.Bytes()) assert.Equal(t, ContentNotFound, err) - _, err = storage.Get(secondFurthest.Bytes(), storage.ContentId(secondFurthest.Bytes())) + _, err = storage.Get(secondFurthest.Bytes()) assert.Equal(t, ContentNotFound, err) - val, err := storage.Get(thirdFurthest.Bytes(), thirdFurthest.Bytes()) + val, err := storage.Get(thirdFurthest.Bytes()) assert.NoError(t, err) assert.NotNil(t, val) } @@ -199,13 +200,13 @@ func TestGetLargestDistance(t *testing.T) { furthestElement := uint256.NewInt(40) secondFurthest := uint256.NewInt(30) - pt7 := storage.put(furthestElement.Bytes(), genBytes(2000)) + pt7 := storage.Put(furthestElement.Bytes(), genBytes(2000)) assert.NoError(t, pt7.Err()) - val, err := storage.Get(furthestElement.Bytes(), furthestElement.Bytes()) + val, err := storage.Get(furthestElement.Bytes()) assert.NoError(t, err) assert.NotNil(t, val) - pt8 := storage.put(secondFurthest.Bytes(), genBytes(2000)) + pt8 := storage.Put(secondFurthest.Bytes(), genBytes(2000)) assert.NoError(t, pt8.Err()) res, err := storage.GetLargestDistance() assert.NoError(t, err) @@ -225,13 +226,13 @@ func TestSimpleForcePruning(t *testing.T) { secondFurthest := uint256.NewInt(30) third := uint256.NewInt(10) - pt1 := storage.put(furthestElement.Bytes(), genBytes(2000)) + pt1 := storage.Put(furthestElement.Bytes(), genBytes(2000)) assert.NoError(t, pt1.Err()) - pt2 := storage.put(secondFurthest.Bytes(), genBytes(2000)) + pt2 := storage.Put(secondFurthest.Bytes(), genBytes(2000)) assert.NoError(t, pt2.Err()) - pt3 := storage.put(third.Bytes(), genBytes(2000)) + pt3 := storage.Put(third.Bytes(), genBytes(2000)) assert.NoError(t, pt3.Err()) res, err := storage.GetLargestDistance() assert.NoError(t, err) @@ -240,13 +241,13 @@ func TestSimpleForcePruning(t *testing.T) { err = storage.ForcePrune(uint256.NewInt(20)) assert.NoError(t, err) - _, err = storage.Get(furthestElement.Bytes(), furthestElement.Bytes()) + _, err = storage.Get(furthestElement.Bytes()) assert.Equal(t, ContentNotFound, err) - _, err = storage.Get(secondFurthest.Bytes(), secondFurthest.Bytes()) + _, err = storage.Get(secondFurthest.Bytes()) assert.Equal(t, ContentNotFound, err) - _, err = storage.Get(third.Bytes(), third.Bytes()) + _, err = storage.Get(third.Bytes()) assert.NoError(t, err) } @@ -272,7 +273,7 @@ func TestForcePruning(t *testing.T) { putCount := 0 // id < maxUint256 - remainder for id.Cmp(uint256.NewInt(0).Sub(maxUint256, remainder)) == -1 { - res := storage.put(id.Bytes(), content) + res := storage.Put(id.Bytes(), content) assert.NoError(t, res.Err()) id = id.Add(id, increment) putCount++ diff --git a/p2p/discover/portal_protocol.go b/p2p/discover/portal_protocol.go index 162c82b18d77..b5785c337161 100644 --- a/p2p/discover/portal_protocol.go +++ b/p2p/discover/portal_protocol.go @@ -5,6 +5,7 @@ import ( "context" "crypto/ecdsa" crand "crypto/rand" + "crypto/sha256" "encoding/binary" "errors" "fmt" @@ -108,6 +109,8 @@ func DefaultPortalProtocolConfig() *PortalProtocolConfig { } } +type PortalProtocolOption func(p *PortalProtocol) + type PortalProtocol struct { table *Table @@ -131,11 +134,17 @@ type PortalProtocol struct { closeCtx context.Context cancelCloseCtx context.CancelFunc storage Storage + toContentId func(contentKey []byte) []byte contentQueue chan *ContentElement } -func NewPortalProtocol(config *PortalProtocolConfig, protocolId string, privateKey *ecdsa.PrivateKey, storage Storage, contentQueue chan *ContentElement) (*PortalProtocol, error) { +func defaultContentIdFunc(contentKey []byte) []byte { + digest := sha256.Sum256(contentKey) + return digest[:] +} + +func NewPortalProtocol(config *PortalProtocolConfig, protocolId string, privateKey *ecdsa.PrivateKey, storage Storage, contentQueue chan *ContentElement, options ...PortalProtocolOption) (*PortalProtocol, error) { nodeDB, err := enode.OpenDB(config.NodeDBPath) if err != nil { return nil, err @@ -160,6 +169,11 @@ func NewPortalProtocol(config *PortalProtocolConfig, protocolId string, privateK validSchemes: enode.ValidSchemes, storage: storage, contentQueue: contentQueue, + toContentId: defaultContentIdFunc, + } + + for _, opt := range options { + opt(protocol) } return protocol, nil @@ -451,9 +465,9 @@ func (p *PortalProtocol) processOffer(target *enode.Node, resp []byte, request * } else { for _, index := range contentKeyBitlist.BitIndices() { contentKey := request.Request.(*PersistOfferRequest).ContentKeys[index] - contentId := p.storage.ContentId(contentKey) + contentId := p.toContentId(contentKey) if contentId != nil { - content, err = p.storage.Get(contentKey, contentId) + content, err = p.storage.Get(contentId) if err != nil { p.log.Error("failed to get content from storage", "err", err) contents = append(contents, []byte{}) @@ -834,13 +848,13 @@ func (p *PortalProtocol) handleFindContent(id enode.ID, addr *net.UDPAddr, reque enrOverhead := 4 //per added ENR, 4 bytes offset overhead var err error - contentId := p.storage.ContentId(request.ContentKey) + contentId := p.toContentId(request.ContentKey) if contentId == nil { return nil, ContentNotFound } var content []byte - content, err = p.storage.Get(request.ContentKey, contentId) + content, err = p.storage.Get(contentId) if err != nil && !errors.Is(err, ContentNotFound) { return nil, err } @@ -1004,10 +1018,10 @@ func (p *PortalProtocol) handleOffer(id enode.ID, addr *net.UDPAddr, request *po contentKeys := make([][]byte, 0) for i, contentKey := range request.ContentKeys { - contentId := p.storage.ContentId(contentKey) + contentId := p.toContentId(contentKey) if contentId != nil { if inRange(p.Self().ID(), p.nodeRadius, contentId) { - if _, err = p.storage.Get(contentKey, contentId); err != nil { + if _, err = p.storage.Get(contentId); err != nil { contentKeyBitlist.SetBitAt(uint64(i), true) contentKeys = append(contentKeys, contentKey) } diff --git a/p2p/discover/portal_protocol_test.go b/p2p/discover/portal_protocol_test.go index 5ca0fdeddc69..26861bb58806 100644 --- a/p2p/discover/portal_protocol_test.go +++ b/p2p/discover/portal_protocol_test.go @@ -13,7 +13,6 @@ import ( "github.com/optimism-java/utp-go" "github.com/prysmaticlabs/go-bitfield" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/internal/testlog" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover/portalwire" @@ -26,19 +25,19 @@ type MockStorage struct { db map[string][]byte } -func (m *MockStorage) ContentId(contentKey []byte) []byte { - return crypto.Keccak256(contentKey) -} +// func (m *MockStorage) ContentId(contentKey []byte) []byte { +// return crypto.Keccak256(contentKey) +// } -func (m *MockStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) { +func (m *MockStorage) Get(contentId []byte) ([]byte, error) { if content, ok := m.db[string(contentId)]; ok { return content, nil } return nil, ContentNotFound } -func (m *MockStorage) Put(contentKey []byte, content []byte) error { - m.db[string(m.ContentId(contentKey))] = content +func (m *MockStorage) Put(contentId []byte, content []byte) error { + m.db[string(contentId)] = content return nil } @@ -243,7 +242,7 @@ func TestPortalWireProtocol(t *testing.T) { return n.ID() == node2.localNode.Node().ID() }) - err = node1.storage.Put([]byte("test_key"), []byte("test_value")) + err = node1.storage.Put(node1.toContentId([]byte("test_key")), []byte("test_value")) assert.NoError(t, err) flag, content, err := node2.findContent(node1.localNode.Node(), []byte("test_key")) @@ -263,7 +262,7 @@ func TestPortalWireProtocol(t *testing.T) { _, err = rand.Read(largeTestContent) assert.NoError(t, err) - err = node1.storage.Put([]byte("large_test_key"), largeTestContent) + err = node1.storage.Put(node1.toContentId([]byte("large_test_key")), largeTestContent) assert.NoError(t, err) flag, content, err = node2.findContent(node1.localNode.Node(), []byte("large_test_key"))