From 89c87c835fc7a192504cd5e67b02524b3b603625 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 15 Sep 2023 16:08:17 -0700 Subject: [PATCH 1/7] kvdb/postgres: remove global application level lock In this commit, we remove the global application level lock from the postgres backend. This lock prevents multiple write transactions from happening at the same time, and will also block a writer if a read is on going. Since this lock was added, we know always open DB connections with the strongest level of concurrency control available: `LevelSerializable`. In concert with the new auto retry logic, we ensure that if db transactions conflict (writing the same key/row in this case), then the tx is retried automatically. Removing this lock should increase perf for the postgres backend, as now concurrent write transactions can proceed, being serialized as needed. Rather then trying to handle concurrency at the application level, we'll set postgres do its job, with the application only needing to retry as necessary. --- kvdb/postgres/db.go | 1 - kvdb/sqlbase/db.go | 8 ------- kvdb/sqlbase/readwrite_tx.go | 44 ------------------------------------ 3 files changed, 53 deletions(-) diff --git a/kvdb/postgres/db.go b/kvdb/postgres/db.go index 90ca8324a8d..425ba162257 100644 --- a/kvdb/postgres/db.go +++ b/kvdb/postgres/db.go @@ -28,7 +28,6 @@ func newPostgresBackend(ctx context.Context, config *Config, prefix string) ( Schema: "public", TableNamePrefix: prefix, SQLiteCmdReplacements: sqliteCmdReplacements, - WithTxLevelLock: true, } return sqlbase.NewSqlBackend(ctx, cfg) diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go index f6f2c759017..4d2d7686a2d 100644 --- a/kvdb/sqlbase/db.go +++ b/kvdb/sqlbase/db.go @@ -66,10 +66,6 @@ type Config struct { // commands. Note that the sqlite keywords to be replaced are // case-sensitive. SQLiteCmdReplacements SQLiteCmdReplacements - - // WithTxLevelLock when set will ensure that there is a transaction - // level lock. - WithTxLevelLock bool } // db holds a reference to the sql db connection. @@ -90,10 +86,6 @@ type db struct { // db is the underlying database connection instance. db *sql.DB - // lock is the global write lock that ensures single writer. This is - // only used if cfg.WithTxLevelLock is set. - lock sync.RWMutex - // table is the name of the table that contains the data for all // top-level buckets that have keys that cannot be mapped to a distinct // sql table. diff --git a/kvdb/sqlbase/readwrite_tx.go b/kvdb/sqlbase/readwrite_tx.go index ec761931adc..18a6a682c90 100644 --- a/kvdb/sqlbase/readwrite_tx.go +++ b/kvdb/sqlbase/readwrite_tx.go @@ -5,7 +5,6 @@ package sqlbase import ( "context" "database/sql" - "sync" "github.com/btcsuite/btcwallet/walletdb" ) @@ -20,28 +19,11 @@ type readWriteTx struct { // active is true if the transaction hasn't been committed yet. active bool - - // locker is a pointer to the global db lock. - locker sync.Locker } // newReadWriteTx creates an rw transaction using a connection from the // specified pool. func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) { - locker := newNoopLocker() - if db.cfg.WithTxLevelLock { - // Obtain the global lock instance. An alternative here is to - // obtain a database lock from Postgres. Unfortunately there is - // no database-level lock in Postgres, meaning that each table - // would need to be locked individually. Perhaps an advisory - // lock could perform this function too. - locker = &db.lock - if readOnly { - locker = db.lock.RLocker() - } - } - locker.Lock() - // Start the transaction. Don't use the timeout context because it would // be applied to the transaction as a whole. If possible, mark the // transaction as read-only to make sure that potential programming @@ -54,7 +36,6 @@ func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) { }, ) if err != nil { - locker.Unlock() return nil, err } @@ -62,7 +43,6 @@ func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) { db: db, tx: tx, active: true, - locker: locker, }, nil } @@ -94,7 +74,6 @@ func (tx *readWriteTx) Rollback() error { // Unlock the transaction regardless of the error result. tx.active = false - tx.locker.Unlock() return err } @@ -162,7 +141,6 @@ func (tx *readWriteTx) Commit() error { // Unlock the transaction regardless of the error result. tx.active = false - tx.locker.Unlock() return err } @@ -204,25 +182,3 @@ func (tx *readWriteTx) Exec(query string, args ...interface{}) (sql.Result, return tx.tx.ExecContext(ctx, query, args...) } - -// noopLocker is an implementation of a no-op sync.Locker. -type noopLocker struct{} - -// newNoopLocker creates a new noopLocker. -func newNoopLocker() sync.Locker { - return &noopLocker{} -} - -// Lock is a noop. -// -// NOTE: this is part of the sync.Locker interface. -func (n *noopLocker) Lock() { -} - -// Unlock is a noop. -// -// NOTE: this is part of the sync.Locker interface. -func (n *noopLocker) Unlock() { -} - -var _ sync.Locker = (*noopLocker)(nil) From e2d1def51d479c9941502340e4c9eef65acb89b9 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 18 Sep 2023 11:58:58 -0700 Subject: [PATCH 2/7] temp: add replace so new code is used --- go.mod | 2 ++ go.sum | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index a52191cf894..50d87e64f67 100644 --- a/go.mod +++ b/go.mod @@ -212,4 +212,6 @@ replace google.golang.org/protobuf => github.com/lightninglabs/protobuf-go-hex-d // docs/INSTALL.md. go 1.19 +replace github.com/lightningnetwork/lnd/kvdb => ./kvdb + retract v0.0.2 diff --git a/go.sum b/go.sum index cca09de1772..f6606208f1c 100644 --- a/go.sum +++ b/go.sum @@ -448,8 +448,6 @@ github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsD github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ= github.com/lightningnetwork/lnd/healthcheck v1.2.3 h1:oqhOOy8WmIEa6RBkYKC0mmYZkhl8T2kGD97n9jpML8o= github.com/lightningnetwork/lnd/healthcheck v1.2.3/go.mod h1:eDxH3dEwV9DeBW/6inrmlVh1qBOFV0AI14EEPnGt9gc= -github.com/lightningnetwork/lnd/kvdb v1.4.4 h1:bCv63rVCvzqj1BkagN/EWTov6NDDgYEG/t0z2HepRMk= -github.com/lightningnetwork/lnd/kvdb v1.4.4/go.mod h1:9SuaIqMA9ugrVkdvgQkYXa8CAKYNYd4vsEYORP4V698= github.com/lightningnetwork/lnd/queue v1.1.1 h1:99ovBlpM9B0FRCGYJo6RSFDlt8/vOkQQZznVb18iNMI= github.com/lightningnetwork/lnd/queue v1.1.1/go.mod h1:7A6nC1Qrm32FHuhx/mi1cieAiBZo5O6l8IBIoQxvkz4= github.com/lightningnetwork/lnd/ticker v1.1.1 h1:J/b6N2hibFtC7JLV77ULQp++QLtCwT6ijJlbdiZFbSM= From ed0524211e80be9775cf48f61c7708964efff843 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 19 Sep 2023 18:12:50 -0700 Subject: [PATCH 3/7] kvdb+sqldb: update SQL error parsing to account for non wrapped errs Some sub-systems like btcwallet will return an error from the database, but they won't properly wrap it. As a result, we were unable to actually catch the serialization errors in the first place. To work around this, we'll now attempt to parse the error string directly. --- kvdb/sqlbase/db.go | 2 ++ kvdb/sqlbase/sqlerrors_postgres.go | 10 ++++++++++ kvdb/sqlbase/sqlerrors_sqlite.go | 12 ++++++++++++ sqldb/sqlerrors.go | 21 +++++++++++++++++++++ 4 files changed, 45 insertions(+) diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go index 4d2d7686a2d..203a305001f 100644 --- a/kvdb/sqlbase/db.go +++ b/kvdb/sqlbase/db.go @@ -308,6 +308,8 @@ func (db *db) executeTransaction(f func(tx walletdb.ReadWriteTx) error, } return err + continue + } } dbErr := tx.Commit() diff --git a/kvdb/sqlbase/sqlerrors_postgres.go b/kvdb/sqlbase/sqlerrors_postgres.go index 3915a34f410..30b95a5541f 100644 --- a/kvdb/sqlbase/sqlerrors_postgres.go +++ b/kvdb/sqlbase/sqlerrors_postgres.go @@ -5,6 +5,7 @@ package sqlbase import ( "errors" "fmt" + "strings" "github.com/jackc/pgconn" "github.com/jackc/pgerrcode" @@ -13,6 +14,15 @@ import ( // parsePostgresError attempts to parse a postgres error as a database agnostic // SQL error. func parsePostgresError(err error) error { + // Sometimes the error won't be properly wrapped, so we'll need to + // inspect raw error itself to detect something we can wrap properly. + const postgresErrMsg = "could not serialize access" + if strings.Contains(err.Error(), postgresErrMsg) { + return &ErrSerializationError{ + DBError: err, + } + } + var pqErr *pgconn.PgError if !errors.As(err, &pqErr) { return nil diff --git a/kvdb/sqlbase/sqlerrors_sqlite.go b/kvdb/sqlbase/sqlerrors_sqlite.go index c291466104d..53a3b9c9956 100644 --- a/kvdb/sqlbase/sqlerrors_sqlite.go +++ b/kvdb/sqlbase/sqlerrors_sqlite.go @@ -5,6 +5,7 @@ package sqlbase import ( "errors" "fmt" + "strings" "modernc.org/sqlite" sqlite3 "modernc.org/sqlite/lib" @@ -13,6 +14,17 @@ import ( // parseSqliteError attempts to parse a sqlite error as a database agnostic // SQL error. func parseSqliteError(err error) error { + // If the error isn't wrapped properly, the errors.As call with fail, + // so we'll also try to check the expected error message directly. + // This is taken from: + // https://gitlab.com/cznic/sqlite/-/blob/v1.25.0/sqlite.go#L75. + const sqliteErrMsg = "SQLITE_BUSY" + if strings.Contains(err.Error(), sqliteErrMsg) { + return &ErrSerializationError{ + DBError: err, + } + } + var sqliteErr *sqlite.Error if !errors.As(err, &sqliteErr) { return nil diff --git a/sqldb/sqlerrors.go b/sqldb/sqlerrors.go index c595bd5281a..5376e26da9b 100644 --- a/sqldb/sqlerrors.go +++ b/sqldb/sqlerrors.go @@ -3,6 +3,7 @@ package sqldb import ( "errors" "fmt" + "strings" "github.com/jackc/pgconn" "github.com/jackc/pgerrcode" @@ -31,6 +32,26 @@ func MapSQLError(err error) error { return parsePostgresError(pqErr) } + // Sometimes the error won't be properly wrapped, so we'll need to + // inspect raw error itself to detect something we can wrap properly. + // This handles a postgres variant of the error. + const postgresErrMsg = "could not serialize access" + if strings.Contains(err.Error(), postgresErrMsg) { + return &ErrSerializationError{ + DBError: err, + } + } + + // We'll also attempt to catch this for sqlite, that uses a slightly + // different error message. This is taken from: + // https://gitlab.com/cznic/sqlite/-/blob/v1.25.0/sqlite.go#L75. + const sqliteErrMsg = "SQLITE_BUSY" + if strings.Contains(err.Error(), sqliteErrMsg) { + return &ErrSerializationError{ + DBError: err, + } + } + // Return original error if it could not be classified as a database // specific error. return err From f8a482f604190d5b362993578e9c2c029147cedf Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 19 Sep 2023 18:14:12 -0700 Subject: [PATCH 4/7] channeldb: explicitly catch error in pruneGraphNodes With the new postgres concurrency control, an error may come from a bucket function that's actually a postgres error. In this case, we need to return early so we can retry the txn. Otherwise, we'll be working with an aborted tx, and never actually return the error so we don't auto retry. --- channeldb/graph.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 8367aaf68b3..712a961b919 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -1492,9 +1492,15 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket, // If we reach this point, then there are no longer any edges // that connect this node, so we can delete it. if err := c.deleteLightningNode(nodes, nodePubKey[:]); err != nil { - log.Warnf("Unable to prune node %x from the "+ - "graph: %v", nodePubKey, err) - continue + if errors.Is(err, ErrGraphNodeNotFound) || + errors.Is(err, ErrGraphNodesNotFound) { + + log.Warnf("Unable to prune node %x from the "+ + "graph: %v", nodePubKey, err) + continue + } + + return err } log.Infof("Pruned unconnected node %x from channel graph", From 6a1bece4fd483df8c6f42591f23dc75bab3df0ba Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 19 Sep 2023 18:15:38 -0700 Subject: [PATCH 5/7] kvdb: don't do a critical log for db serialization errors In this commit, we fix a bug that would cause the entire db to shutdown if hit a panic (since db operations in the main buckets exit with a panic) while executing a txn call back. This might be a postgres error we need to check, so we don't want to bail out, and instead want to pass up the error to the caller so we can retry if needed. --- kvdb/sqlbase/db.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go index 203a305001f..abcf510bdef 100644 --- a/kvdb/sqlbase/db.go +++ b/kvdb/sqlbase/db.go @@ -173,7 +173,6 @@ func (db *db) getPrefixedTableName(table string) string { func catchPanic(f func() error) (err error) { defer func() { if r := recover(); r != nil { - log.Criticalf("Caught unhandled error: %v", r) switch data := r.(type) { case error: @@ -182,6 +181,18 @@ func catchPanic(f func() error) (err error) { default: err = errors.New(fmt.Sprintf("%v", data)) } + + // Before we issue a critical log which'll cause the + // daemon to shutdown, we'll first check if this is a + // DB serialization error. If so, then we don't need to + // log as we can retry safely and avoid tearing + // everything down. + if IsSerializationError(MapSQLError(err)) { + log.Tracef("detected db serialization error "+ + "via panic: %v", err) + } else { + log.Criticalf("Caught unhandled error: %v", r) + } } }() From f98e8a411e6e9c1bafc56fb668c5a6bddd3811c0 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 19 Sep 2023 18:16:28 -0700 Subject: [PATCH 6/7] kvdb: implement txn retry when executing txn call back At times we'll get an error from the transaction call back itself, since we may be using postgres over streaming RPC. In this case, we still need to roll back then attempt to retry. --- kvdb/sqlbase/db.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go index abcf510bdef..e011a39ae19 100644 --- a/kvdb/sqlbase/db.go +++ b/kvdb/sqlbase/db.go @@ -311,16 +311,23 @@ func (db *db) executeTransaction(f func(tx walletdb.ReadWriteTx) error, return dbErr } - err = catchPanic(func() error { return f(tx) }) - if err != nil { + fnErr := catchPanic(func() error { return f(tx) }) + if fnErr != nil { if rollbackErr := tx.Rollback(); rollbackErr != nil { log.Errorf("Error rolling back tx: %v", rollbackErr) } - return err + dbErr := MapSQLError(fnErr) + if IsSerializationError(dbErr) { + // Nothing to roll back here, since we didn't + // even get a transaction yet. + if waitBeforeRetry(i) { continue } + } + + return fnErr } dbErr := tx.Commit() From c88e270a4f67f902a0b698c6d53299b3dc447fbb Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 19 Sep 2023 18:18:51 -0700 Subject: [PATCH 7/7] kvdb: fix bug in commit retry loop In this commit, we fix a bug in the commit retry loop, we'll now make sure that if we get an error on commit, we'll map it to the SQL error then attempt to decide if we need to retry or not. --- kvdb/sqlbase/db.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go index e011a39ae19..8c5fbd25980 100644 --- a/kvdb/sqlbase/db.go +++ b/kvdb/sqlbase/db.go @@ -330,16 +330,20 @@ func (db *db) executeTransaction(f func(tx walletdb.ReadWriteTx) error, return fnErr } - dbErr := tx.Commit() - if IsSerializationError(dbErr) { + commitErr := tx.Commit() + if commitErr != nil { _ = tx.Rollback() - if waitBeforeRetry(i) { - continue + dbErr := MapSQLError(fnErr) + if IsSerializationError(dbErr) { + + if waitBeforeRetry(i) { + continue + } } } - return dbErr + return commitErr } // If we get to this point, then we weren't able to successfully commit