diff --git a/sqldb/v2/config.go b/sqldb/v2/config.go new file mode 100644 index 0000000000..5d4100a99b --- /dev/null +++ b/sqldb/v2/config.go @@ -0,0 +1,106 @@ +package sqldb + +import ( + "fmt" + "net/url" + "time" +) + +const ( + // defaultMaxConns is the number of permitted active and idle + // connections. We want to limit this so it isn't unlimited. We use the + // same value for the number of idle connections as, this can speed up + // queries given a new connection doesn't need to be established each + // time. + defaultMaxConns = 25 + + // defaultMaxIdleConns is the number of permitted idle connections. + defaultMaxIdleConns = 6 + + // defaultConnMaxIdleTime is the amount of time a connection can be + // idle before it is closed. + defaultConnMaxIdleTime = 5 * time.Minute + + // defaultConnMaxLifetime is the maximum amount of time a connection can + // be reused for before it is closed. + defaultConnMaxLifetime = 10 * time.Minute +) + +// SqliteConfig holds all the config arguments needed to interact with our +// sqlite DB. +// +//nolint:ll +type SqliteConfig struct { + Timeout time.Duration `long:"timeout" description:"The time after which a database query should be timed out."` + BusyTimeout time.Duration `long:"busytimeout" description:"The maximum amount of time to wait for a database connection to become available for a query."` + MaxConnections int `long:"maxconnections" description:"The maximum number of open connections to the database."` + MaxIdleConnections int `long:"maxidleconnections" description:"Max number of idle connections to keep in the connection pool."` + ConnMaxLifetime time.Duration `long:"connmaxlifetime" description:"Max amount of time a connection can be reused for before it is closed. Valid time units are {s, m, h}."` + PragmaOptions []string `long:"pragmaoptions" description:"A list of pragma options to set on a database connection. For example, 'auto_vacuum=incremental'. Note that the flag must be specified multiple times if multiple options are to be set."` + SkipMigrations bool `long:"skipmigrations" description:"Skip applying migrations on startup."` + + // SkipMigrationDbBackup if true, then a backup of the database will not + // be created before applying migrations. + SkipMigrationDbBackup bool `long:"skipmigrationdbbackup" description:"Skip creating a backup of the database before applying migrations."` + + QueryConfig `group:"query" namespace:"query"` +} + +const ( + // DefaultSqliteBusyTimeout is the default busy_timeout value used + // when no BusyTimeout is configured. + DefaultSqliteBusyTimeout = 5 * time.Second +) + +// busyTimeoutMs returns the busy_timeout value in milliseconds. If +// BusyTimeout is not set, it returns the default value. +func (s *SqliteConfig) busyTimeoutMs() int64 { + if s.BusyTimeout > 0 { + return s.BusyTimeout.Milliseconds() + } + + return DefaultSqliteBusyTimeout.Milliseconds() +} + +// Validate checks that the SqliteConfig values are valid. +func (p *SqliteConfig) Validate() error { + if err := p.QueryConfig.Validate(true); err != nil { + return fmt.Errorf("invalid query config: %w", err) + } + + return nil +} + +// PostgresConfig holds the postgres database configuration. +// +//nolint:ll +type PostgresConfig struct { + Dsn string `long:"dsn" description:"Database connection string."` + Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."` + MaxOpenConnections int `long:"maxconnections" description:"Max open connections to keep alive to the database server."` + MaxIdleConnections int `long:"maxidleconnections" description:"Max number of idle connections to keep in the connection pool."` + ConnMaxLifetime time.Duration `long:"connmaxlifetime" description:"Max amount of time a connection can be reused for before it is closed. Valid time units are {s, m, h}."` + ConnMaxIdleTime time.Duration `long:"connmaxidletime" description:"Max amount of time a connection can be idle for before it is closed. Valid time units are {s, m, h}."` + RequireSSL bool `long:"requiressl" description:"Whether to require using SSL (mode: require) when connecting to the server."` + SkipMigrations bool `long:"skipmigrations" description:"Skip applying migrations on startup."` + QueryConfig `group:"query" namespace:"query"` +} + +// Validate checks that the PostgresConfig values are valid. +func (p *PostgresConfig) Validate() error { + if p.Dsn == "" { + return fmt.Errorf("DSN is required") + } + + // Parse the DSN as a URL. + _, err := url.Parse(p.Dsn) + if err != nil { + return fmt.Errorf("invalid DSN: %w", err) + } + + if err := p.QueryConfig.Validate(false); err != nil { + return fmt.Errorf("invalid query config: %w", err) + } + + return nil +} diff --git a/sqldb/v2/go.mod b/sqldb/v2/go.mod new file mode 100644 index 0000000000..7e987ca314 --- /dev/null +++ b/sqldb/v2/go.mod @@ -0,0 +1,77 @@ +module github.com/lightningnetwork/lnd/sqldb/v2 + +require ( + github.com/btcsuite/btclog/v2 v2.0.1-0.20250728225537-6090e87c6c5b + github.com/davecgh/go-spew v1.1.1 + github.com/golang-migrate/migrate/v4 v4.19.0 + github.com/jackc/pgconn v1.14.3 + github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 + github.com/jackc/pgx/v5 v5.7.4 + github.com/lightningnetwork/lnd/fn/v2 v2.0.8 + github.com/ory/dockertest/v3 v3.10.0 + github.com/pmezard/go-difflib v1.0.0 + github.com/stretchr/testify v1.10.0 + golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b + modernc.org/sqlite v1.38.2 +) + +require ( + dario.cat/mergo v1.0.2 // indirect + github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect + github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/containerd/continuity v0.3.0 // indirect + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect + github.com/docker/cli v28.1.1+incompatible // indirect + github.com/docker/docker v28.3.3+incompatible // indirect + github.com/docker/go-connections v0.5.0 // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/go-viper/mapstructure/v2 v2.3.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/go-cmp v0.7.0 // indirect + github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.3.3 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/sys/user v0.3.0 // indirect + github.com/moby/term v0.5.0 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.0 // indirect + github.com/opencontainers/runc v1.2.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect + github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + github.com/xeipuuv/gojsonschema v1.2.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect + go.opentelemetry.io/otel/trace v1.36.0 // indirect + go.uber.org/atomic v1.10.0 // indirect + golang.org/x/crypto v0.37.0 // indirect + golang.org/x/sync v0.15.0 // indirect + golang.org/x/sys v0.34.0 // indirect + golang.org/x/text v0.24.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + modernc.org/libc v1.66.3 // indirect; indirectv + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect +) + +// We are using a fork of the migration library with custom functionality that +// did not yet make it into the upstream repository. +replace github.com/golang-migrate/migrate/v4 => github.com/lightninglabs/migrate/v4 v4.18.2-9023d66a-fork-pr-2.0.20251211093704-71c1eef09789 + +go 1.23.12 diff --git a/sqldb/v2/go.sum b/sqldb/v2/go.sum new file mode 100644 index 0000000000..113570a0c3 --- /dev/null +++ b/sqldb/v2/go.sum @@ -0,0 +1,230 @@ +dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= +dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c h1:4HxD1lBUGUddhzgaNgrCPsFWd7cGYNpeFUgd9ZIgyM0= +github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c/go.mod h1:w7xnGOhwT3lmrS4H3b/D1XAXxvh+tbhUm8xeHN2y3TQ= +github.com/btcsuite/btclog/v2 v2.0.1-0.20250728225537-6090e87c6c5b h1:MQ+Q6sDy37V1wP1Yu79A5KqJutolqUGwA99UZWQDWZM= +github.com/btcsuite/btclog/v2 v2.0.1-0.20250728225537-6090e87c6c5b/go.mod h1:XItGUfVOxotJL8kkuk2Hj3EVow5KCugXl3wWfQ6K0AE= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= +github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= +github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= +github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= +github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= +github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dhui/dktest v0.4.5 h1:uUfYBIVREmj/Rw6MvgmqNAYzTiKOHJak+enB5Di73MM= +github.com/dhui/dktest v0.4.5/go.mod h1:tmcyeHDKagvlDrz7gDKq4UAJOLIfVZYkfD5OnHDwcCo= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/cli v28.1.1+incompatible h1:eyUemzeI45DY7eDPuwUcmDyDj1pM98oD5MdSpiItp8k= +github.com/docker/cli v28.1.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= +github.com/docker/docker v28.3.3+incompatible h1:Dypm25kh4rmk49v1eiVbsAtpAsYURjYkaKubwuBdxEI= +github.com/docker/docker v28.3.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= +github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-viper/mapstructure/v2 v2.3.0 h1:27XbWsHIqhbdR5TIC911OfYvgSaW93HM+dX7970Q7jk= +github.com/go-viper/mapstructure/v2 v2.3.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= +github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= +github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg= +github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lightninglabs/migrate/v4 v4.18.2-9023d66a-fork-pr-2.0.20251211093704-71c1eef09789 h1:7kX7vUgHUazAHcCJ6uzBDa4/2MEGEbMEfa01GtfqmTQ= +github.com/lightninglabs/migrate/v4 v4.18.2-9023d66a-fork-pr-2.0.20251211093704-71c1eef09789/go.mod h1:99BKpIi6ruaaXRM1A77eqZ+FWPQ3cfRa+ZVy5bmWMaY= +github.com/lightningnetwork/lnd/fn/v2 v2.0.8 h1:r2SLz7gZYQPVc3IZhU82M66guz3Zk2oY+Rlj9QN5S3g= +github.com/lightningnetwork/lnd/fn/v2 v2.0.8/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/sys/user v0.3.0 h1:9ni5DlcW5an3SvRSx4MouotOygvzaXbaSrc/wGDFWPo= +github.com/moby/sys/user v0.3.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= +github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= +github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/opencontainers/runc v1.2.0 h1:qke7ZVCmJcKrJVY2iHJVC+0kql9uYdkusOPsQOOeBw4= +github.com/opencontainers/runc v1.2.0/go.mod h1:/PXzF0h531HTMsYQnmxXkBD7YaGShm/2zcRB79dksUc= +github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4= +github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= +golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= +golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= +golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.3.0 h1:MfDY1b1/0xN1CyMlQDac0ziEy9zJQd9CXBRRDHw2jJo= +gotest.tools/v3 v3.3.0/go.mod h1:Mcr9QNxkg0uMvy/YElmo4SpXgJKWgQvYrT7Kw5RzJ1A= +modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM= +modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU= +modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE= +modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM= +modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ= +modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.38.2 h1:Aclu7+tgjgcQVShZqim41Bbw9Cho0y/7WzYptXqkEek= +modernc.org/sqlite v1.38.2/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/sqldb/v2/interfaces.go b/sqldb/v2/interfaces.go new file mode 100644 index 0000000000..caec453ea2 --- /dev/null +++ b/sqldb/v2/interfaces.go @@ -0,0 +1,455 @@ +package sqldb + +import ( + "context" + "database/sql" + "fmt" + "math" + "math/rand" + "time" +) + +var ( + // DefaultStoreTimeout is the default timeout used for any interaction + // with the storage/database. + DefaultStoreTimeout = time.Second * 10 +) + +const ( + // DefaultNumTxRetries is the default number of times we'll retry a + // transaction if it fails with an error that permits transaction + // repetition. + DefaultNumTxRetries = 20 + + // DefaultInitialRetryDelay is the default initial delay between + // retries. This will be used to generate a random delay between -50% + // and +50% of this value, so 20 to 60 milliseconds. The retry will be + // doubled after each attempt until we reach DefaultMaxRetryDelay. We + // start with a random value to avoid multiple goroutines that are + // created at the same time to effectively retry at the same time. + DefaultInitialRetryDelay = time.Millisecond * 40 + + // DefaultMaxRetryDelay is the default maximum delay between retries. + DefaultMaxRetryDelay = time.Second * 3 +) + +// BackendType is an enum that represents the type of database backend we're +// using. +type BackendType uint8 + +const ( + // BackendTypeUnknown indicates we're using an unknown backend. + BackendTypeUnknown BackendType = iota + + // BackendTypeSqlite indicates we're using a SQLite backend. + BackendTypeSqlite + + // BackendTypePostgres indicates we're using a Postgres backend. + BackendTypePostgres +) + +// TxOptions represents a set of options one can use to control what type of +// database transaction is created. Transaction can be either read or write. +type TxOptions interface { + // ReadOnly returns true if the transaction should be read only. + ReadOnly() bool +} + +// txOptions is a concrete implementation of the TxOptions interface. +type txOptions struct { + // readOnly indicates if the transaction should be read-only. + readOnly bool +} + +// ReadOnly returns true if the transaction should be read only. +// +// NOTE: This is part of the TxOptions interface. +func (t *txOptions) ReadOnly() bool { + return t.readOnly +} + +// WriteTxOpt returns a TxOptions that indicates that the transaction +// should be a write transaction. +func WriteTxOpt() TxOptions { + return &txOptions{ + readOnly: false, + } +} + +// ReadTxOpt returns a TxOptions that indicates that the transaction +// should be a read-only transaction. +func ReadTxOpt() TxOptions { + return &txOptions{ + readOnly: true, + } +} + +// BaseQuerier is a generic interface that represents the base methods that any +// database backend implementation which uses a Querier for its operations must +// implement. +type BaseQuerier interface { + // Backend returns the type of the database backend used. + Backend() BackendType +} + +// BatchedTx is a generic interface that represents the ability to execute +// several operations to a given storage interface in a single atomic +// transaction. Typically, Q here will be some subset of the main sqlc.Querier +// interface allowing it to only depend on the routines it needs to implement +// any additional business logic. +type BatchedTx[Q BaseQuerier] interface { + // ExecTx will execute the passed txBody, operating upon generic + // parameter Q (usually a storage interface) in a single transaction. + // + // The set of TxOptions are passed in order to allow the caller to + // specify if a transaction should be read-only and optionally what + // type of concurrency control should be used. + ExecTx(ctx context.Context, txOptions TxOptions, + txBody func(Q) error, reset func()) error + + // Backend returns the type of the database backend used. + Backend() BackendType +} + +// Tx represents a database transaction that can be committed or rolled back. +type Tx interface { + // Commit commits the database transaction, an error should be returned + // if the commit isn't possible. + Commit() error + + // Rollback rolls back an incomplete database transaction. + // Transactions that were able to be committed can still call this as a + // noop. + Rollback() error +} + +// QueryCreator is a generic function that's used to create a Querier, which is +// a type of interface that implements storage related methods from a database +// transaction. This will be used to instantiate an object callers can use to +// apply multiple modifications to an object interface in a single atomic +// transaction. +type QueryCreator[Q any] func(*sql.Tx) Q + +// BatchedQuerier is a generic interface that allows callers to create a new +// database transaction based on an abstract type that implements the TxOptions +// interface. +type BatchedQuerier interface { + // BeginTx creates a new database transaction given the set of + // transaction options. + BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error) +} + +// txExecutorOptions is a struct that holds the options for the transaction +// executor. This can be used to do things like retry a transaction due to an +// error a certain amount of times. +type txExecutorOptions struct { + numRetries int + initialRetryDelay time.Duration + maxRetryDelay time.Duration +} + +// defaultTxExecutorOptions returns the default options for the transaction +// executor. +func defaultTxExecutorOptions() *txExecutorOptions { + return &txExecutorOptions{ + numRetries: DefaultNumTxRetries, + initialRetryDelay: DefaultInitialRetryDelay, + maxRetryDelay: DefaultMaxRetryDelay, + } +} + +// randRetryDelay returns a random retry delay between 0 and the configured max +// delay. +func (t *txExecutorOptions) randRetryDelay() time.Duration { + return time.Duration(rand.Int63n(int64(t.maxRetryDelay))) //nolint:gosec +} + +// TxExecutorOption is a functional option that allows us to pass in optional +// argument when creating the executor. +type TxExecutorOption func(*txExecutorOptions) + +// WithTxRetries is a functional option that allows us to specify the number of +// times a transaction should be retried if it fails with a repeatable error. +func WithTxRetries(numRetries int) TxExecutorOption { + return func(o *txExecutorOptions) { + o.numRetries = numRetries + } +} + +// WithTxRetryDelay is a functional option that allows us to specify the delay +// to wait before a transaction is retried. +func WithTxRetryDelay(delay time.Duration) TxExecutorOption { + return func(o *txExecutorOptions) { + o.initialRetryDelay = delay + } +} + +// TransactionExecutor is a generic struct that abstracts away from the type of +// query a type needs to run under a database transaction, and also the set of +// options for that transaction. The QueryCreator is used to create a query +// given a database transaction created by the BatchedQuerier. +type TransactionExecutor[Query BaseQuerier] struct { + BatchedQuerier + + createQuery QueryCreator[Query] + + opts *txExecutorOptions +} + +// NewTransactionExecutor creates a new instance of a TransactionExecutor given +// a Querier query object and a concrete type for the type of transactions the +// Querier understands. +func NewTransactionExecutor[Querier BaseQuerier](db BatchedQuerier, + createQuery QueryCreator[Querier], + opts ...TxExecutorOption) *TransactionExecutor[Querier] { + + txOpts := defaultTxExecutorOptions() + for _, optFunc := range opts { + optFunc(txOpts) + } + + return &TransactionExecutor[Querier]{ + BatchedQuerier: db, + createQuery: createQuery, + opts: txOpts, + } +} + +// randRetryDelay returns a random retry delay between -50% and +50% of the +// configured delay that is doubled for each attempt and capped at a max value. +func randRetryDelay(initialRetryDelay, maxRetryDelay time.Duration, + attempt int) time.Duration { + + halfDelay := initialRetryDelay / 2 + randDelay := rand.Int63n(int64(initialRetryDelay)) //nolint:gosec + + // 50% plus 0%-100% gives us the range of 50%-150%. + initialDelay := halfDelay + time.Duration(randDelay) + + // If this is the first attempt, we just return the initial delay. + if attempt == 0 { + return initialDelay + } + + // For each subsequent delay, we double the initial delay. This still + // gives us a somewhat random delay, but it still increases with each + // attempt. If we double something n times, that's the same as + // multiplying the value with 2^n. We limit the power to 32 to avoid + // overflows. + factor := time.Duration(math.Pow(2, min(float64(attempt), 32))) + actualDelay := initialDelay * factor + + // Cap the delay at the maximum configured value. + if actualDelay > maxRetryDelay { + return maxRetryDelay + } + + return actualDelay +} + +// MakeTx is a function that creates a new transaction. It returns a Tx and an +// error if the transaction cannot be created. This is used to abstract the +// creation of a transaction from the actual transaction logic in order to be +// able to reuse the transaction retry logic in other packages. +type MakeTx func() (Tx, error) + +// TxBody represents the function type for transactions. It returns an +// error to indicate success or failure. +type TxBody func(tx Tx) error + +// RollbackTx is a function that is called when a transaction needs to be rolled +// back due to a serialization error. By using this intermediate function, we +// can avoid having to return rollback errors that are not actionable by the +// caller. +type RollbackTx func(tx Tx) error + +// OnBackoff is a function that is called when a transaction is retried due to a +// serialization error. The function is called with the retry attempt number and +// the delay before the next retry. +type OnBackoff func(retry int, delay time.Duration) + +// ExecuteSQLTransactionWithRetry is a helper function that executes a +// transaction with retry logic. It will retry the transaction if it fails with +// a serialization error. The function will return an error if the transaction +// fails with a non-retryable error, the context is cancelled or the number of +// retries is exceeded. +func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx, + rollbackTx RollbackTx, txBody TxBody, onBackoff OnBackoff, + opts *txExecutorOptions) error { + + waitBeforeRetry := func(attemptNumber int) bool { + retryDelay := randRetryDelay( + opts.initialRetryDelay, opts.maxRetryDelay, + attemptNumber, + ) + + onBackoff(attemptNumber, retryDelay) + + select { + // Before we try again, we'll wait with a random backoff based + // on the retry delay. + case <-time.After(retryDelay): + return true + + // If the daemon is shutting down, then we'll exit early. + case <-ctx.Done(): + return false + } + } + + for i := 0; i < opts.numRetries; i++ { + tx, err := makeTx() + if err != nil { + dbErr := MapSQLError(err) + log.Tracef("Failed to makeTx: err=%v, dbErr=%v", err, + dbErr) + + if IsSerializationOrDeadlockError(dbErr) { + // Nothing to roll back here, since we haven't + // even get a transaction yet. We'll just wait + // and try again. + if waitBeforeRetry(i) { + continue + } + } + + return dbErr + } + + // Rollback is safe to call even if the tx is already closed, + // so if the tx commits successfully, this is a no-op. + defer func() { + _ = tx.Rollback() + }() + + if bodyErr := txBody(tx); bodyErr != nil { + log.Tracef("Error in txBody: %v", bodyErr) + + // Roll back the transaction, then attempt a random + // backoff and try again if the error was a + // serialization error. + if err := rollbackTx(tx); err != nil { + return MapSQLError(err) + } + + dbErr := MapSQLError(bodyErr) + if IsSerializationOrDeadlockError(dbErr) { + if waitBeforeRetry(i) { + continue + } + } + + return dbErr + } + + // Commit transaction. + if commitErr := tx.Commit(); commitErr != nil { + log.Tracef("Failed to commit tx: %v", commitErr) + + // Roll back the transaction, then attempt a random + // backoff and try again if the error was a + // serialization error. + if err := rollbackTx(tx); err != nil { + return MapSQLError(err) + } + + dbErr := MapSQLError(commitErr) + if IsSerializationOrDeadlockError(dbErr) { + if waitBeforeRetry(i) { + continue + } + } + + return dbErr + } + + return nil + } + + // If we get to this point, then we weren't able to successfully commit + // a tx given the max number of retries. + return ErrTxRetriesExceeded +} + +// ExecTx is a wrapper for txBody to abstract the creation and commit of a db +// transaction. The db transaction is embedded in a `*Queries` that txBody +// needs to use when executing each one of the queries that need to be applied +// atomically. This can be used by other storage interfaces to parameterize the +// type of query and options run, in order to have access to batched operations +// related to a storage object. +func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, + txOptions TxOptions, txBody func(Q) error, reset func()) error { + + makeTx := func() (Tx, error) { + return t.BatchedQuerier.BeginTx(ctx, txOptions) + } + + execTxBody := func(tx Tx) error { + sqlTx, ok := tx.(*sql.Tx) + if !ok { + return fmt.Errorf("expected *sql.Tx, got %T", tx) + } + + reset() + return txBody(t.createQuery(sqlTx)) + } + + onBackoff := func(retry int, delay time.Duration) { + log.Tracef("Retrying transaction due to tx serialization "+ + "error, attempt_number=%v, delay=%v", retry, delay) + } + + rollbackTx := func(tx Tx) error { + sqlTx, ok := tx.(*sql.Tx) + if !ok { + return fmt.Errorf("expected *sql.Tx, got %T", tx) + } + + _ = sqlTx.Rollback() + + return nil + } + + return ExecuteSQLTransactionWithRetry( + ctx, makeTx, rollbackTx, execTxBody, onBackoff, t.opts, + ) +} + +// DB is an interface that represents a generic SQL database. It provides +// methods to apply migrations and access the underlying database connection. +type DB interface { + MigrationExecutor + + // GetBaseDB returns the underlying BaseDB instance. + GetBaseDB() *BaseDB +} + +// BaseDB is the base database struct that each implementation can embed to +// gain some common functionality. +type BaseDB struct { + *sql.DB + + // BackendType defines the type of database backend the database is. + BackendType BackendType + + // SkipMigrations can be set to true to skip running any migrations + // during the iinitialization of the database. + SkipMigrations bool +} + +// BeginTx wraps the normal sql specific BeginTx method with the TxOptions +// interface. This interface is then mapped to the concrete sql tx options +// struct. +func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) { + sqlOptions := sql.TxOptions{ + Isolation: sql.LevelSerializable, + ReadOnly: opts.ReadOnly(), + } + + return s.DB.BeginTx(ctx, &sqlOptions) +} + +// Backend returns the type of the database backend used. +func (s *BaseDB) Backend() BackendType { + return s.BackendType +} diff --git a/sqldb/v2/log.go b/sqldb/v2/log.go new file mode 100644 index 0000000000..19f43f7018 --- /dev/null +++ b/sqldb/v2/log.go @@ -0,0 +1,24 @@ +package sqldb + +import "github.com/btcsuite/btclog/v2" + +// Subsystem defines the logging code for this subsystem. +const Subsystem = "SQL2" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log = btclog.Disabled + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/sqldb/v2/migrations.go b/sqldb/v2/migrations.go new file mode 100644 index 0000000000..56e0f3c4d4 --- /dev/null +++ b/sqldb/v2/migrations.go @@ -0,0 +1,418 @@ +package sqldb + +import ( + "bytes" + "embed" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "reflect" + "strings" + + "github.com/btcsuite/btclog/v2" + "github.com/davecgh/go-spew/spew" + "github.com/golang-migrate/migrate/v4" + "github.com/golang-migrate/migrate/v4/database" + "github.com/golang-migrate/migrate/v4/source/httpfs" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/pmezard/go-difflib/difflib" +) + +var ( + // ErrMigrationMismatch is returned when a migrated record does not + // match the original record. + ErrMigrationMismatch = fmt.Errorf("migrated record does not match " + + "original record") +) + +// MigrationDescriptor is a description struct that describes SQL migrations. +// Each migration is associated with a specific schema version and a global +// database version. Migrations are applied in the order of their global +// database version. If a migration includes a non-nil MigrationFn, it is +// executed after the SQL schema has been migrated to the corresponding schema +// version. +type MigrationDescriptor struct { + // Name is the name of the migration. + Name string + + // Version represents the "global" database version for this migration. + // Unlike the schema version tracked by golang-migrate, it encompasses + // all migrations, including those managed by golang-migrate as well + // as custom in-code migrations. + Version int + + // SchemaVersion represents the schema version tracked by golang-migrate + // at which the migration is applied. + SchemaVersion int +} + +// MigrationSet encapsulates all necessary information to manage and apply +// a series of SQL migrations, and corresponding code migrations, to a database. +type MigrationSet struct { + // TrackingTableName is the name of the table used by golang-migrate to + // track the current schema version. + TrackingTableName string + + // SQLFiles is the embedded file system containing the SQL migration + // files. + SQLFiles embed.FS + + // SQLFileDirectory is the directory containing the SQL migration files. + SQLFileDirectory string + + // MakeProgrammaticMigrations is a function that returns a map of + // ProgrammaticMigrEntry functions that can be used to execute a Golang + // based migration step. The key is the migration version and the value + // is the Golang migration function entry that should be run for the + // migration version. Note that a database version can be either an SQL + // migration or a programmatic migration, but not both at the same time. + MakeProgrammaticMigrations func( + *BaseDB) (map[uint]migrate.ProgrammaticMigrEntry, error) + + // LatestMigrationVersion is the latest migration version of the + // database. This is used to implement downgrade protection for the + // daemon. + LatestMigrationVersion uint + + // Descriptors defines a list of migrations to be applied to the + // database. Each migration is assigned a version number, determining + // its execution order. + // The schema version, tracked by golang-migrate, ensures migrations are + // applied to the correct schema. For migrations involving only schema + // changes, the migration function can be left nil. For custom + // migrations an implemented migration function is required. + Descriptors []MigrationDescriptor +} + +// MigrationTarget is a functional option that can be passed to applyMigrations +// to specify a target version to migrate to. `currentDbVersion` is the current +// (migration) version of the database, or None if unknown. +// `maxMigrationVersion` is the maximum migration version known to the driver, +// or None if unknown. +type MigrationTarget func(mig *migrate.Migrate, + currentDbVersion int, maxMigrationVersion uint) error + +// MigrationExecutor is an interface that abstracts the migration functionality. +type MigrationExecutor interface { + // ExecuteMigrations runs database migrations for the given migration + // set using the executor's default production migration target. A + // migration may include a schema change, a custom migration function, + // or both. + ExecuteMigrations(set MigrationSet) error +} + +var ( + // TargetLatest is a MigrationTarget that migrates to the latest + // version available. + TargetLatest = func(mig *migrate.Migrate, _ int, _ uint) error { + return mig.Up() + } + + // TargetVersion is a MigrationTarget that migrates to the given + // version. + TargetVersion = func(version uint) MigrationTarget { + return func(mig *migrate.Migrate, _ int, _ uint) error { + return mig.Migrate(version) + } + } + + // ErrMigrationDowngrade is returned when a database downgrade is + // detected. + ErrMigrationDowngrade = errors.New("database downgrade detected") +) + +// migrationOption is a functional option that can be passed to migrate related +// methods to modify their behavior. +type migrateOptions struct { + latestVersion fn.Option[uint] + programmaticMigrs map[uint]migrate.ProgrammaticMigrEntry +} + +// defaultMigrateOptions returns a new migrateOptions instance with default +// settings. +func defaultMigrateOptions() *migrateOptions { + return &migrateOptions{ + programmaticMigrs: make(map[uint]migrate.ProgrammaticMigrEntry), + } +} + +// MigrateOpt is a functional option that can be passed to migrate related +// methods to modify behavior. +type MigrateOpt func(*migrateOptions) + +// WithLatestVersion allows callers to override the default latest version +// setting. +func WithLatestVersion(version uint) MigrateOpt { + return func(o *migrateOptions) { + o.latestVersion = fn.Some(version) + } +} + +// WithProgrammaticMigrations is an option that can be used to set a map of +// ProgrammaticMigrEntry functions that can be used to execute a Golang based +// migration step. The key is the migration version and the value is the +// Golang migration function entry that should be run for the migration version. +func WithProgrammaticMigrations( + programmaticMigrs map[uint]migrate.ProgrammaticMigrEntry) MigrateOpt { + + return func(o *migrateOptions) { + o.programmaticMigrs = programmaticMigrs + } +} + +// migrationLogger is a logger that wraps the passed btclog.Logger so it can be +// used to log migrations. +type migrationLogger struct { + log btclog.Logger +} + +// Printf is like fmt.Printf. We map this to the target logger based on the +// current log level. +func (m *migrationLogger) Printf(format string, v ...interface{}) { + // Trim trailing newlines from the format. + format = strings.TrimRight(format, "\n") + + switch m.log.Level() { + case btclog.LevelTrace: + m.log.Tracef(format, v...) + case btclog.LevelDebug: + m.log.Debugf(format, v...) + case btclog.LevelInfo: + m.log.Infof(format, v...) + case btclog.LevelWarn: + m.log.Warnf(format, v...) + case btclog.LevelError: + m.log.Errorf(format, v...) + case btclog.LevelCritical: + m.log.Criticalf(format, v...) + case btclog.LevelOff: + } +} + +// Verbose should return true when verbose logging output is wanted +func (m *migrationLogger) Verbose() bool { + return m.log.Level() <= btclog.LevelDebug +} + +// applyMigrations executes all database migration files found in the given file +// system under the given path, using the passed database driver and database +// name. +func applyMigrations(fs fs.FS, driver database.Driver, path, + dbName string, targetVersion MigrationTarget, + opts *migrateOptions) error { + + // With the migrate instance open, we'll create a new migration source + // using the embedded file system stored in sqlSchemas. The library + // we're using can't handle a raw file system interface, so we wrap it + // in this intermediate layer. + migrateFileServer, err := httpfs.New(http.FS(fs), path) + if err != nil { + return err + } + + // Finally, we'll run the migration with our driver above based on the + // open DB, and also the migration source stored in the file system + // above. + sqlMigrate, err := migrate.NewWithInstance( + "migrations", migrateFileServer, dbName, driver, + migrate.WithProgrammaticMigrations(opts.programmaticMigrs), + ) + if err != nil { + return err + } + + migrationVersion, dirty, err := sqlMigrate.Version() + if err != nil && !errors.Is(err, migrate.ErrNilVersion) { + return fmt.Errorf("unable to determine current migration "+ + "version: %w", err) + } + + // If the migration version is dirty, we should not proceed with further + // migrations, as this indicates that a previous migration did not + // complete successfully and requires manual intervention. + if dirty { + return fmt.Errorf("database is in a dirty state at version "+ + "%v, manual intervention required", migrationVersion) + } + + // As the down migrations may end up *dropping* data, we want to + // prevent that without explicit accounting. + latestVersion, err := opts.latestVersion.UnwrapOrErr( + fmt.Errorf("latest version not set"), + ) + if err != nil { + return fmt.Errorf("unable to get latest version: %w", err) + } + if migrationVersion > latestVersion { + return fmt.Errorf("%w: database version is newer than the "+ + "latest migration version, preventing downgrade: "+ + "db_version=%v, latest_migration_version=%v", + ErrMigrationDowngrade, migrationVersion, latestVersion) + } + + // Report the current version of the database before the migration. + currentDbVersion, _, err := driver.Version() + if err != nil { + return fmt.Errorf("unable to get current db version: %w", err) + } + log.Infof("Attempting to apply migration(s) "+ + "(current_db_version=%v, latest_migration_version=%v)", + currentDbVersion, latestVersion) + + // Apply our local logger to the migration instance. + sqlMigrate.Log = &migrationLogger{log} + + // Execute the migration based on the target given. + err = targetVersion(sqlMigrate, currentDbVersion, latestVersion) + if err != nil && !errors.Is(err, migrate.ErrNoChange) { + return err + } + + // Report the current version of the database after the migration. + currentDbVersion, _, err = driver.Version() + if err != nil { + return fmt.Errorf("unable to get current db version: %w", err) + } + log.Infof("Database version after migration: %v", currentDbVersion) + + return nil +} + +// replacerFS is an implementation of a fs.FS virtual file system that wraps an +// existing file system but does a search-and-replace operation on each file +// when it is opened. +type replacerFS struct { + parentFS fs.FS + replaces map[string]string +} + +// A compile-time assertion to make sure replacerFS implements the fs.FS +// interface. +var _ fs.FS = (*replacerFS)(nil) + +// newReplacerFS creates a new replacer file system, wrapping the given parent +// virtual file system. Each file within the file system is undergoing a +// search-and-replace operation when it is opened, using the given map where the +// key denotes the search term and the value the term to replace each occurrence +// with. +func newReplacerFS(parent fs.FS, replaces map[string]string) *replacerFS { + return &replacerFS{ + parentFS: parent, + replaces: replaces, + } +} + +// Open opens a file in the virtual file system. +// +// NOTE: This is part of the fs.FS interface. +func (t *replacerFS) Open(name string) (fs.File, error) { + f, err := t.parentFS.Open(name) + if err != nil { + return nil, err + } + + stat, err := f.Stat() + if err != nil { + return nil, err + } + + if stat.IsDir() { + return f, err + } + + return newReplacerFile(f, t.replaces) +} + +type replacerFile struct { + parentFile fs.File + buf bytes.Buffer +} + +// A compile-time assertion to make sure replacerFile implements the fs.File +// interface. +var _ fs.File = (*replacerFile)(nil) + +func newReplacerFile(parent fs.File, replaces map[string]string) (*replacerFile, + error) { + + content, err := io.ReadAll(parent) + if err != nil { + return nil, err + } + + contentStr := string(content) + for from, to := range replaces { + contentStr = strings.ReplaceAll(contentStr, from, to) + } + + var buf bytes.Buffer + _, err = buf.WriteString(contentStr) + if err != nil { + return nil, err + } + + return &replacerFile{ + parentFile: parent, + buf: buf, + }, nil +} + +// Stat returns statistics/info about the file. +// +// NOTE: This is part of the fs.File interface. +func (t *replacerFile) Stat() (fs.FileInfo, error) { + return t.parentFile.Stat() +} + +// Read reads as many bytes as possible from the file into the given slice. +// +// NOTE: This is part of the fs.File interface. +func (t *replacerFile) Read(bytes []byte) (int, error) { + return t.buf.Read(bytes) +} + +// Close closes the underlying file. +// +// NOTE: This is part of the fs.File interface. +func (t *replacerFile) Close() error { + // We already fully read and then closed the file when creating this + // instance, so there's nothing to do for us here. + return nil +} + +// ApplyAllMigrations applies both the SQLC and custom in-code migrations to the +// SQLite database. +func ApplyAllMigrations(executor MigrationExecutor, sets []MigrationSet) error { + for _, set := range sets { + err := executor.ExecuteMigrations(set) + if err != nil { + return fmt.Errorf("error applying migrations: %w", err) + } + } + + return nil +} + +// CompareRecords checks if the original and migrated objects are equal. If +// they are not, it returns an error with a unified diff of the two objects. +func CompareRecords(original, migrated any, identifier string) error { + if reflect.DeepEqual(original, migrated) { + return nil + } + + diff := difflib.UnifiedDiff{ + A: difflib.SplitLines(spew.Sdump(original)), + B: difflib.SplitLines(spew.Sdump(migrated)), + FromFile: "Expected", + FromDate: "", + ToFile: "Actual", + ToDate: "", + Context: 3, + } + diffText, _ := difflib.GetUnifiedDiffString(diff) + + return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier, + diffText) +} diff --git a/sqldb/v2/no_sqlite.go b/sqldb/v2/no_sqlite.go new file mode 100644 index 0000000000..0665882e69 --- /dev/null +++ b/sqldb/v2/no_sqlite.go @@ -0,0 +1,45 @@ +//go:build js || (windows && (arm || 386)) || (linux && (ppc64 || mips || mipsle || mips64)) + +package sqldb + +import ( + "context" + "fmt" +) + +var ( + // Make sure SqliteStore implements the DB interface. + _ DB = (*SqliteStore)(nil) +) + +// SqliteStore is a database store implementation that uses a sqlite backend. +// +// NOTE: This specific struct implementation does not implement a real sqlite +// store, and only exists to ensure that build tag environments that do not +// support sqlite database backends still contain a struct called SqliteStore, +// to ensure that the build process doesn't error. +type SqliteStore struct { + cfg *SqliteConfig + + *BaseDB +} + +// NewSqliteStore attempts to open a new sqlite database based on the passed +// config. +func NewSqliteStore(cfg *SqliteConfig, dbPath string) (*SqliteStore, error) { + return nil, fmt.Errorf("SQLite backend not supported in WebAssembly") +} + +// GetBaseDB returns the underlying BaseDB instance for the SQLite store. +// It is a trivial helper method to comply with the sqldb.DB interface. +func (s *SqliteStore) GetBaseDB() *BaseDB { + return s.BaseDB +} + +// ApplyAllMigrations applies both the SQLC and custom in-code migrations to +// the SQLite database. +func (s *SqliteStore) ApplyAllMigrations(context.Context, + []MigrationSet) error { + + return fmt.Errorf("SQLite backend not supported in WebAssembly") +} diff --git a/sqldb/v2/paginate.go b/sqldb/v2/paginate.go new file mode 100644 index 0000000000..4fd2a9d4db --- /dev/null +++ b/sqldb/v2/paginate.go @@ -0,0 +1,318 @@ +package sqldb + +import ( + "context" + "fmt" +) + +const ( + // maxSQLiteBatchSize is the maximum number of items that can be + // included in a batch query IN clause for SQLite. This was determined + // using the TestSQLSliceQueries test. + maxSQLiteBatchSize = 32766 + + // maxPostgresBatchSize is the maximum number of items that can be + // included in a batch query IN clause for Postgres. This was determined + // using the TestSQLSliceQueries test. + maxPostgresBatchSize = 65535 + + // defaultSQLitePageSize is the default page size for SQLite queries. + defaultSQLitePageSize = 100 + + // defaultPostgresPageSize is the default page size for Postgres + // queries. + defaultPostgresPageSize = 10500 + + // defaultSQLiteBatchSize is the default batch size for SQLite queries. + defaultSQLiteBatchSize = 250 + + // defaultPostgresBatchSize is the default batch size for Postgres + // queries. + defaultPostgresBatchSize = 5000 +) + +// QueryConfig holds configuration values for SQL queries. +// +//nolint:ll +type QueryConfig struct { + // MaxBatchSize is the maximum number of items included in a batch + // query IN clauses list. + MaxBatchSize uint32 `long:"max-batch-size" description:"The maximum number of items to include in a batch query IN clause. This is used for queries that fetch results based on a list of identifiers."` + + // MaxPageSize is the maximum number of items returned in a single page + // of results. This is used for paginated queries. + MaxPageSize uint32 `long:"max-page-size" description:"The maximum number of items to return in a single page of results. This is used for paginated queries."` +} + +// Validate checks that the QueryConfig values are valid. +func (c *QueryConfig) Validate(sqlite bool) error { + if c.MaxBatchSize <= 0 { + return fmt.Errorf("max batch size must be greater than "+ + "zero, got %d", c.MaxBatchSize) + } + if c.MaxPageSize <= 0 { + return fmt.Errorf("max page size must be greater than "+ + "zero, got %d", c.MaxPageSize) + } + + if sqlite { + if c.MaxBatchSize > maxSQLiteBatchSize { + return fmt.Errorf("max batch size for SQLite cannot "+ + "exceed %d, got %d", maxSQLiteBatchSize, + c.MaxBatchSize) + } + } else { + if c.MaxBatchSize > maxPostgresBatchSize { + return fmt.Errorf("max batch size for Postgres cannot "+ + "exceed %d, got %d", maxPostgresBatchSize, + c.MaxBatchSize) + } + } + + return nil +} + +// DefaultSQLiteConfig returns a default configuration for SQL queries to a +// SQLite backend. +func DefaultSQLiteConfig() *QueryConfig { + return &QueryConfig{ + MaxBatchSize: defaultSQLiteBatchSize, + MaxPageSize: defaultSQLitePageSize, + } +} + +// DefaultPostgresConfig returns a default configuration for SQL queries to a +// Postgres backend. +func DefaultPostgresConfig() *QueryConfig { + return &QueryConfig{ + MaxBatchSize: defaultPostgresBatchSize, + MaxPageSize: defaultPostgresPageSize, + } +} + +// BatchQueryFunc represents a function that takes a batch of converted items +// and returns results. +type BatchQueryFunc[T any, R any] func(context.Context, []T) ([]R, error) + +// ItemCallbackFunc represents a function that processes individual results. +type ItemCallbackFunc[R any] func(context.Context, R) error + +// ConvertFunc represents a function that converts from input type to query type +// for the batch query. +type ConvertFunc[I any, T any] func(I) T + +// ExecuteBatchQuery executes a query in batches over a slice of input items. +// It converts the input items to a query type using the provided convertFunc, +// executes the query in batches using the provided queryFunc, and applies +// the callback to each result. This is useful for queries using the +// "WHERE x IN []slice" pattern. It takes that slice, splits it into batches of +// size MaxBatchSize, and executes the query for each batch. +// +// NOTE: it is the caller's responsibility to ensure that the expected return +// results are unique across all pages. Meaning that if the input items are +// split up, a result that is returned in one page should not be expected to +// be returned in another page. +func ExecuteBatchQuery[I any, T any, R any](ctx context.Context, + cfg *QueryConfig, inputItems []I, convertFunc ConvertFunc[I, T], + queryFunc BatchQueryFunc[T, R], callback ItemCallbackFunc[R]) error { + + if len(inputItems) == 0 { + return nil + } + + // Process items in pages. + for i := 0; i < len(inputItems); i += int(cfg.MaxBatchSize) { + // Calculate the end index for this page. + end := i + int(cfg.MaxBatchSize) + if end > len(inputItems) { + end = len(inputItems) + } + + // Get the page slice of input items. + inputPage := inputItems[i:end] + + // Convert only the items needed for this page. + convertedPage := make([]T, len(inputPage)) + for j, inputItem := range inputPage { + convertedPage[j] = convertFunc(inputItem) + } + + // Execute the query for this page. + results, err := queryFunc(ctx, convertedPage) + if err != nil { + return fmt.Errorf("query failed for page "+ + "starting at %d: %w", i, err) + } + + // Apply the callback to each result. + for _, result := range results { + if err := callback(ctx, result); err != nil { + return fmt.Errorf("callback failed for "+ + "result: %w", err) + } + } + } + + return nil +} + +// PagedQueryFunc represents a function that fetches a page of results using a +// cursor. It returns the fetched items and should return an empty slice when no +// more results. +type PagedQueryFunc[C any, T any] func(context.Context, C, int32) ([]T, error) + +// CursorExtractFunc represents a function that extracts the cursor value from +// an item. This cursor will be used for the next page fetch. +type CursorExtractFunc[T any, C any] func(T) C + +// ItemProcessFunc represents a function that processes individual items. +type ItemProcessFunc[T any] func(context.Context, T) error + +// ExecutePaginatedQuery executes a cursor-based paginated query. It continues +// fetching pages until no more results are returned, processing each item with +// the provided callback. +// +// Parameters: +// - initialCursor: the starting cursor value (e.g., 0, -1, "", etc.). +// - queryFunc: function that fetches a page given cursor and limit. +// - extractCursor: function that extracts cursor from an item for next page. +// - processItem: function that processes each individual item. +// +// NOTE: it is the caller's responsibility to "undo" any processing done on +// items if the query fails on a later page. +func ExecutePaginatedQuery[C any, T any](ctx context.Context, cfg *QueryConfig, + initialCursor C, queryFunc PagedQueryFunc[C, T], + extractCursor CursorExtractFunc[T, C], + processItem ItemProcessFunc[T]) error { + + cursor := initialCursor + + for { + // Fetch the next page. + items, err := queryFunc(ctx, cursor, int32(cfg.MaxPageSize)) + if err != nil { + return fmt.Errorf("failed to fetch page with "+ + "cursor %v: %w", cursor, err) + } + + // If no items returned, we're done. + if len(items) == 0 { + break + } + + // Process each item in the page. + for _, item := range items { + if err := processItem(ctx, item); err != nil { + return fmt.Errorf("failed to process item: %w", + err) + } + + // Update cursor for next iteration. + cursor = extractCursor(item) + } + + // If the number of items is less than the max page size, + // we assume there are no more items to fetch. + if len(items) < int(cfg.MaxPageSize) { + break + } + } + + return nil +} + +// CollectAndBatchDataQueryFunc represents a function that batch loads +// additional data for collected identifiers, returning the batch data that +// applies to all items. +type CollectAndBatchDataQueryFunc[ID any, BatchData any] func(context.Context, + []ID) (BatchData, error) + +// ItemWithBatchDataProcessFunc represents a function that processes individual +// items along with shared batch data. +type ItemWithBatchDataProcessFunc[T any, BatchData any] func(context.Context, + T, BatchData) error + +// CollectFunc represents a function that extracts an identifier from a +// paginated item. +type CollectFunc[T any, ID any] func(T) (ID, error) + +// ExecuteCollectAndBatchWithSharedDataQuery implements a page-by-page +// processing pattern where each page is immediately processed with batch-loaded +// data before moving to the next page. +// +// It: +// 1. Fetches a page of items using cursor-based pagination +// 2. Collects identifiers from that page and batch loads shared data +// 3. Processes each item in the page with the shared batch data +// 4. Moves to the next page and repeats +// +// Parameters: +// - initialCursor: starting cursor for pagination +// - pageQueryFunc: fetches a page of items +// - extractPageCursor: extracts cursor from paginated item for next page +// - collectFunc: extracts identifier from paginated item +// - batchDataFunc: batch loads shared data from collected IDs for one page +// - processItem: processes each item with the shared batch data +func ExecuteCollectAndBatchWithSharedDataQuery[C any, T any, I any, D any]( + ctx context.Context, cfg *QueryConfig, initialCursor C, + pageQueryFunc PagedQueryFunc[C, T], + extractPageCursor CursorExtractFunc[T, C], + collectFunc CollectFunc[T, I], + batchDataFunc CollectAndBatchDataQueryFunc[I, D], + processItem ItemWithBatchDataProcessFunc[T, D]) error { + + cursor := initialCursor + + for { + // Step 1: Fetch the next page of items. + items, err := pageQueryFunc(ctx, cursor, int32(cfg.MaxPageSize)) + if err != nil { + return fmt.Errorf("failed to fetch page with "+ + "cursor %v: %w", cursor, err) + } + + // If no items returned, we're done. + if len(items) == 0 { + break + } + + // Step 2: Collect identifiers from this page and batch load + // data. + pageIDs := make([]I, len(items)) + for i, item := range items { + pageIDs[i], err = collectFunc(item) + if err != nil { + return fmt.Errorf("failed to collect "+ + "identifier from item: %w", err) + } + } + + // Batch load shared data for this page. + batchData, err := batchDataFunc(ctx, pageIDs) + if err != nil { + return fmt.Errorf("failed to load batch data for "+ + "page: %w", err) + } + + // Step 3: Process each item in this page with the shared batch + // data. + for _, item := range items { + err := processItem(ctx, item, batchData) + if err != nil { + return fmt.Errorf("failed to process item "+ + "with batch data: %w", err) + } + + // Update cursor for next page. + cursor = extractPageCursor(item) + } + + // If the number of items is less than the max page size, + // we assume there are no more items to fetch. + if len(items) < int(cfg.MaxPageSize) { + break + } + } + + return nil +} diff --git a/sqldb/v2/postgres.go b/sqldb/v2/postgres.go new file mode 100644 index 0000000000..35fa513c5d --- /dev/null +++ b/sqldb/v2/postgres.go @@ -0,0 +1,243 @@ +package sqldb + +import ( + "database/sql" + "fmt" + "net/url" + "path" + "strings" + "time" + + pgx_migrate "github.com/golang-migrate/migrate/v4/database/pgx/v5" + _ "github.com/golang-migrate/migrate/v4/source/file" + _ "github.com/jackc/pgx/v5" + "github.com/lightningnetwork/lnd/fn/v2" +) + +var ( + // DefaultPostgresFixtureLifetime is the default maximum time a Postgres + // test fixture is being kept alive. After that time the docker + // container will be terminated forcefully, even if the tests aren't + // fully executed yet. So this time needs to be chosen correctly to be + // longer than the longest expected individual test run time. + DefaultPostgresFixtureLifetime = 10 * time.Minute + + // postgresSchemaReplacements is a map of schema strings that need to be + // replaced for postgres. This is needed because we write the schemas to + // work with sqlite primarily but in sqlc's own dialect, and postgres + // has some differences. + postgresSchemaReplacements = map[string]string{ + "BLOB": "BYTEA", + "INTEGER PRIMARY KEY": "BIGSERIAL PRIMARY KEY", + // We need this space in front of the TIMESTAMP keyword to + // avoid replacing words which just have the word "TIMESTAMP" in + // them. + "TIMESTAMP": " TIMESTAMP WITHOUT TIME ZONE", + "UNHEX": "DECODE", + } + + // Make sure PostgresStore implements the MigrationExecutor interface. + _ MigrationExecutor = (*PostgresStore)(nil) + + // Make sure PostgresStore implements the DB interface. + _ DB = (*PostgresStore)(nil) +) + +// replacePasswordInDSN takes a DSN string and returns it with the password +// replaced by "***". +func replacePasswordInDSN(dsn string) (string, error) { + // Parse the DSN as a URL + u, err := url.Parse(dsn) + if err != nil { + return "", err + } + + // Check if the URL has a user info part + if u.User != nil { + username := u.User.Username() + + // Reconstruct user info with "***" as password + userInfo := username + ":***@" + + // Rebuild the DSN with the modified user info + sanitizeDSN := strings.Replace( + dsn, u.User.String()+"@", userInfo, 1, + ) + + return sanitizeDSN, nil + } + + // Return the original DSN if no user info is present + return dsn, nil +} + +// getDatabaseNameFromDSN extracts the database name from a DSN string. +func getDatabaseNameFromDSN(dsn string) (string, error) { + // Parse the DSN as a URL + u, err := url.Parse(dsn) + if err != nil { + return "", err + } + + // The database name is the last segment of the path. Trim leading slash + // and return the last segment. + return path.Base(u.Path), nil +} + +// PostgresStore is a database store implementation that uses a Postgres +// backend. +type PostgresStore struct { + cfg *PostgresConfig + + *BaseDB +} + +// NewPostgresStore creates a new store that is backed by a Postgres database +// backend. +func NewPostgresStore(cfg *PostgresConfig) (*PostgresStore, error) { + sanitizedDSN, err := replacePasswordInDSN(cfg.Dsn) + if err != nil { + return nil, err + } + log.Infof("Using SQL database '%s'", sanitizedDSN) + + db, err := sql.Open("pgx", cfg.Dsn) + if err != nil { + return nil, err + } + + // Ensure the migration tracker table exists before running migrations. + // This table tracks migration progress and ensures compatibility with + // SQLC query generation. If the table is already created by an SQLC + // migration, this operation becomes a no-op. + migrationTrackerSQL := ` + CREATE TABLE IF NOT EXISTS migration_tracker ( + version INTEGER UNIQUE NOT NULL, + migration_time TIMESTAMP NOT NULL + );` + + _, err = db.Exec(migrationTrackerSQL) + if err != nil { + return nil, fmt.Errorf("error creating migration tracker: %w", + err) + } + + maxConns := defaultMaxConns + if cfg.MaxOpenConnections > 0 { + maxConns = cfg.MaxOpenConnections + } + + maxIdleConns := defaultMaxIdleConns + if cfg.MaxIdleConnections > 0 { + maxIdleConns = cfg.MaxIdleConnections + } + + connMaxLifetime := defaultConnMaxLifetime + if cfg.ConnMaxLifetime > 0 { + connMaxLifetime = cfg.ConnMaxLifetime + } + + connMaxIdleTime := defaultConnMaxIdleTime + if cfg.ConnMaxIdleTime > 0 { + connMaxIdleTime = cfg.ConnMaxIdleTime + } + + db.SetMaxOpenConns(maxConns) + db.SetMaxIdleConns(maxIdleConns) + db.SetConnMaxLifetime(connMaxLifetime) + db.SetConnMaxIdleTime(connMaxIdleTime) + + return &PostgresStore{ + cfg: cfg, + BaseDB: &BaseDB{ + DB: db, + BackendType: BackendTypePostgres, + SkipMigrations: cfg.SkipMigrations, + }, + }, nil +} + +// GetBaseDB returns the underlying BaseDB instance for the Postgres store. +// It is a trivial helper method to comply with the sqldb.DB interface. +func (s *PostgresStore) GetBaseDB() *BaseDB { + return s.BaseDB +} + +func errPostgresMigration(err error) error { + return fmt.Errorf("error creating postgres migration: %w", err) +} + +// ExecuteMigrations runs migrations for the Postgres database using the +// default production migration target. +func (s *PostgresStore) ExecuteMigrations(set MigrationSet) error { + if s.cfg.SkipMigrations { + return nil + } + + return s.executeMigrations(TargetLatest, set) +} + +// executeMigrations runs migrations for the Postgres database, depending on +// the target given, either all migrations or up to a given version. +func (s *PostgresStore) executeMigrations(target MigrationTarget, + set MigrationSet) error { + + dbName, err := getDatabaseNameFromDSN(s.cfg.Dsn) + if err != nil { + return err + } + + driver, err := pgx_migrate.WithInstance(s.DB, &pgx_migrate.Config{ + MigrationsTable: set.TrackingTableName, + }) + if err != nil { + return errPostgresMigration(err) + } + + opts := &migrateOptions{ + latestVersion: fn.Some(set.LatestMigrationVersion), + } + + if set.MakeProgrammaticMigrations != nil { + postMigSteps, err := set.MakeProgrammaticMigrations(s.BaseDB) + if err != nil { + return errPostgresMigration(err) + } + opts.programmaticMigrs = postMigSteps + } + + // Populate the database with our set of schemas based on our embedded + // in-memory file system. + postgresFS := newReplacerFS(set.SQLFiles, postgresSchemaReplacements) + return applyMigrations( + postgresFS, driver, set.SQLFileDirectory, dbName, target, opts, + ) +} + +// GetSchemaVersion returns the current schema version of the Postgres database. +func (s *PostgresStore) GetSchemaVersion() (int, bool, error) { + driver, err := pgx_migrate.WithInstance(s.DB, &pgx_migrate.Config{}) + if err != nil { + return 0, false, errPostgresMigration(err) + + } + + version, dirty, err := driver.Version() + if err != nil { + return 0, false, err + } + + return version, dirty, nil +} + +// SetSchemaVersion sets the schema version of the Postgres database. +// +// NOTE: This alters the internal database schema tracker. USE WITH CAUTION!!! +func (s *PostgresStore) SetSchemaVersion(version int, dirty bool) error { + driver, err := pgx_migrate.WithInstance(s.DB, &pgx_migrate.Config{}) + if err != nil { + return errPostgresMigration(err) + } + + return driver.SetVersion(version, dirty) +} diff --git a/sqldb/v2/postgres_fixture.go b/sqldb/v2/postgres_fixture.go new file mode 100644 index 0000000000..cf11154561 --- /dev/null +++ b/sqldb/v2/postgres_fixture.go @@ -0,0 +1,208 @@ +//go:build !js && !(windows && (arm || 386)) && !(linux && (ppc64 || mips || mipsle || mips64)) && !(netbsd || openbsd) + +package sqldb + +import ( + "context" + "crypto/rand" + "database/sql" + "encoding/hex" + "fmt" + "strconv" + "strings" + "testing" + "time" + + _ "github.com/jackc/pgx/v5" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/require" +) + +const ( + testPgUser = "test" + testPgPass = "test" + testPgDBName = "test" + PostgresTag = "15" +) + +// TestPgFixture is a test fixture that starts a Postgres 11 instance in a +// docker container. +type TestPgFixture struct { + db *sql.DB + pool *dockertest.Pool + resource *dockertest.Resource + host string + port int +} + +// NewTestPgFixture constructs a new TestPgFixture starting up a docker +// container running Postgres 15. The started container will expire in after +// the passed duration. +func NewTestPgFixture(t testing.TB, expiry time.Duration) *TestPgFixture { + // Use a sensible default on Windows (tcp/http) and linux/osx (socket) + // by specifying an empty endpoint. + pool, err := dockertest.NewPool("") + require.NoError(t, err, "Could not connect to docker") + + // Create a predictable container name. + containerName := sanitizeDockerName(t.Name() + "-postgresql-container") + + // Pulls an image, creates a container based on it and runs it. + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Name: containerName, + Repository: "postgres", + Tag: PostgresTag, + Env: []string{ + fmt.Sprintf("POSTGRES_USER=%v", testPgUser), + fmt.Sprintf("POSTGRES_PASSWORD=%v", testPgPass), + fmt.Sprintf("POSTGRES_DB=%v", testPgDBName), + "listen_addresses='*'", + }, + Cmd: []string{ + "postgres", + "-c", "log_statement=all", + "-c", "log_destination=stderr", + "-c", "max_connections=5000", + }, + }, func(config *docker.HostConfig) { + // Set AutoRemove to true so that stopped container goes away + // by itself. + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{Name: "no"} + }) + require.NoError(t, err, "Could not start resource") + + hostAndPort := resource.GetHostPort("5432/tcp") + parts := strings.Split(hostAndPort, ":") + host := parts[0] + port, err := strconv.ParseInt(parts[1], 10, 64) + require.NoError(t, err) + + fixture := &TestPgFixture{ + host: host, + port: int(port), + } + databaseURL := fixture.GetConfig(testPgDBName).Dsn + log.Infof("Connecting to Postgres fixture: %v\n", databaseURL) + + // Tell docker to hard kill the container in "expiry" seconds. + require.NoError(t, resource.Expire(uint(expiry.Seconds()))) + + // Exponential backoff-retry, because the application in the container + // might not be ready to accept connections yet. + pool.MaxWait = 120 * time.Second + + var testDB *sql.DB + err = pool.Retry(func() error { + testDB, err = sql.Open("pgx", databaseURL) + if err != nil { + return err + } + + return testDB.Ping() + }) + require.NoError(t, err, "Could not connect to docker") + + // Now fill in the rest of the fixture. + fixture.db = testDB + fixture.pool = pool + fixture.resource = resource + + return fixture +} + +// sanitizeDockerName returns a Docker-safe container name by replacing +// disallowed path separators ("/") with underscores. +func sanitizeDockerName(name string) string { + return strings.ReplaceAll(name, "/", "_") +} + +// GetConfig returns the full config of the Postgres node. +func (f *TestPgFixture) GetConfig(dbName string) *PostgresConfig { + return &PostgresConfig{ + Dsn: fmt.Sprintf( + "postgres://%v:%v@%v:%v/%v?sslmode=disable", + testPgUser, testPgPass, f.host, f.port, dbName, + ), + } +} + +// TearDown stops the underlying docker container. +func (f *TestPgFixture) TearDown(t testing.TB) { + err := f.pool.Purge(f.resource) + require.NoError(t, err, "Could not purge resource") +} + +func (f *TestPgFixture) DB() *sql.DB { + return f.db +} + +// RandomDBName generates a random database name. +func RandomDBName(t testing.TB) string { + randBytes := make([]byte, 8) + _, err := rand.Read(randBytes) + require.NoError(t, err) + + return "test_" + hex.EncodeToString(randBytes) +} + +// NewTestPostgresDB is a helper function that creates a Postgres database for +// testing using the given fixture. +func NewTestPostgresDB(t testing.TB, fixture *TestPgFixture, + sets []MigrationSet) *PostgresStore { + + t.Helper() + + dbName := RandomDBName(t) + + t.Logf("Creating new Postgres DB '%s' for testing", dbName) + + _, err := fixture.db.ExecContext( + context.Background(), "CREATE DATABASE "+dbName, + ) + require.NoError(t, err) + + cfg := fixture.GetConfig(dbName) + store, err := NewPostgresStore(cfg) + require.NoError(t, err) + + require.NoError(t, ApplyAllMigrations(store, sets)) + + t.Cleanup(func() { + require.NoError(t, store.DB.Close()) + }) + + return store +} + +// NewTestPostgresDBWithVersion is a helper function that creates a Postgres +// database for testing and migrates it to the given version. +func NewTestPostgresDBWithVersion(t testing.TB, fixture *TestPgFixture, + sets MigrationSet, version uint) *PostgresStore { + + t.Helper() + + t.Logf("Creating new Postgres DB for testing, migrating to version %d", + version) + + dbName := RandomDBName(t) + _, err := fixture.db.ExecContext( + context.Background(), "CREATE DATABASE "+dbName, + ) + require.NoError(t, err) + + storeCfg := fixture.GetConfig(dbName) + storeCfg.SkipMigrations = true + store, err := NewPostgresStore(storeCfg) + require.NoError(t, err) + + err = store.executeMigrations(TargetVersion(version), sets) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, store.DB.Close()) + }) + + return store +} diff --git a/sqldb/v2/sqlerrors.go b/sqldb/v2/sqlerrors.go new file mode 100644 index 0000000000..4c5ac7d1a8 --- /dev/null +++ b/sqldb/v2/sqlerrors.go @@ -0,0 +1,247 @@ +//go:build !js && !(windows && (arm || 386)) && !(linux && (ppc64 || mips || mipsle || mips64)) + +package sqldb + +import ( + "errors" + "fmt" + "strings" + + "github.com/jackc/pgconn" + "github.com/jackc/pgerrcode" + "modernc.org/sqlite" + sqlite3 "modernc.org/sqlite/lib" +) + +var ( + // ErrTxRetriesExceeded is returned when a transaction is retried more + // than the max allowed valued without a success. + ErrTxRetriesExceeded = errors.New("db tx retries exceeded") + + // postgresRetriableErrMsgs are strings that signify retriable errors + // resulting from serialization failures. + postgresRetriableErrMsgs = []string{ + "could not serialize access", + "current transaction is aborted", + "not enough elements in RWConflictPool", + "deadlock detected", + "commit unexpectedly resulted in rollback", + } +) + +// MapSQLError attempts to interpret a given error as a database agnostic SQL +// error. +func MapSQLError(err error) error { + if err == nil { + return nil + } + + // Attempt to interpret the error as a sqlite error. + var sqliteErr *sqlite.Error + if errors.As(err, &sqliteErr) { + return parseSqliteError(sqliteErr) + } + + // Attempt to interpret the error as a postgres error. + var pqErr *pgconn.PgError + if errors.As(err, &pqErr) { + return parsePostgresError(pqErr) + } + + // Sometimes the error won't be properly wrapped, so we'll need to + // inspect raw error itself to detect something we can wrap properly. + // This handles a postgres variant of the error. + for _, postgresErrMsg := range postgresRetriableErrMsgs { + if strings.Contains(err.Error(), postgresErrMsg) { + return &ErrSerializationError{ + DBError: err, + } + } + } + + // We'll also attempt to catch this for sqlite, that uses a slightly + // different error message. This is taken from: + // https://gitlab.com/cznic/sqlite/-/blob/v1.25.0/sqlite.go#L75. + const sqliteErrMsg = "SQLITE_BUSY" + if strings.Contains(err.Error(), sqliteErrMsg) { + return &ErrSerializationError{ + DBError: err, + } + } + + // Return original error if it could not be classified as a database + // specific error. + return err +} + +// parsePostgresError attempts to parse a sqlite error as a database agnostic +// SQL error. +func parseSqliteError(sqliteErr *sqlite.Error) error { + switch sqliteErr.Code() { + // Handle unique constraint violation error. + case sqlite3.SQLITE_CONSTRAINT_UNIQUE: + return &ErrSQLUniqueConstraintViolation{ + DBError: sqliteErr, + } + + case sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY: + return &ErrSQLUniqueConstraintViolation{ + DBError: sqliteErr, + } + + // Database is currently busy, so we'll need to try again. + case sqlite3.SQLITE_BUSY: + return &ErrSerializationError{ + DBError: sqliteErr, + } + + // A write operation could not continue because of a conflict within the + // same database connection. + case sqlite3.SQLITE_LOCKED, sqlite3.SQLITE_BUSY_SNAPSHOT: + return &ErrDeadlockError{ + DbError: sqliteErr, + } + + // Generic error, need to parse the message further. + case sqlite3.SQLITE_ERROR: + errMsg := sqliteErr.Error() + + switch { + case strings.Contains(errMsg, "no such table"): + return &ErrSchemaError{ + DbError: sqliteErr, + } + + default: + return fmt.Errorf("unknown sqlite error: %w", sqliteErr) + } + + default: + return fmt.Errorf("unknown sqlite error: %w", sqliteErr) + } +} + +// parsePostgresError attempts to parse a postgres error as a database agnostic +// SQL error. +func parsePostgresError(pqErr *pgconn.PgError) error { + switch pqErr.Code { + // Handle unique constraint violation error. + case pgerrcode.UniqueViolation: + return &ErrSQLUniqueConstraintViolation{ + DBError: pqErr, + } + + // Unable to serialize the transaction, so we'll need to try again. + case pgerrcode.SerializationFailure: + return &ErrSerializationError{ + DBError: pqErr, + } + + // In failed SQL transaction because we didn't catch a previous + // serialization error, so return this one as a serialization error. + case pgerrcode.InFailedSQLTransaction: + return &ErrSerializationError{ + DBError: pqErr, + } + + // Deadlock detedted because of a serialization error, so return this + // one as a serialization error. + case pgerrcode.DeadlockDetected: + return &ErrSerializationError{ + DBError: pqErr, + } + + // Handle schema error. + case pgerrcode.UndefinedColumn, pgerrcode.UndefinedTable: + return &ErrSchemaError{ + DbError: pqErr, + } + + default: + return fmt.Errorf("unknown postgres error: %w", pqErr) + } +} + +// ErrSQLUniqueConstraintViolation is an error type which represents a database +// agnostic SQL unique constraint violation. +type ErrSQLUniqueConstraintViolation struct { + DBError error +} + +func (e ErrSQLUniqueConstraintViolation) Error() string { + return fmt.Sprintf("sql unique constraint violation: %v", e.DBError) +} + +// ErrSerializationError is an error type which represents a database agnostic +// error that a transaction couldn't be serialized with other concurrent db +// transactions. +type ErrSerializationError struct { + DBError error +} + +// Unwrap returns the wrapped error. +func (e ErrSerializationError) Unwrap() error { + return e.DBError +} + +// Error returns the error message. +func (e ErrSerializationError) Error() string { + return e.DBError.Error() +} + +// IsSerializationError returns true if the given error is a serialization +// error. +func IsSerializationError(err error) bool { + var serializationError *ErrSerializationError + return errors.As(err, &serializationError) +} + +// ErrDeadlockError is an error type which represents a database agnostic +// error where transactions have led to cyclic dependencies in lock acquisition. +type ErrDeadlockError struct { + DbError error +} + +// Unwrap returns the wrapped error. +func (e ErrDeadlockError) Unwrap() error { + return e.DbError +} + +// Error returns the error message. +func (e ErrDeadlockError) Error() string { + return e.DbError.Error() +} + +// IsDeadlockError returns true if the given error is a deadlock error. +func IsDeadlockError(err error) bool { + var deadlockError *ErrDeadlockError + return errors.As(err, &deadlockError) +} + +// IsSerializationOrDeadlockError returns true if the given error is either a +// deadlock error or a serialization error. +func IsSerializationOrDeadlockError(err error) bool { + return IsDeadlockError(err) || IsSerializationError(err) +} + +// ErrSchemaError is an error type which represents a database agnostic error +// that the schema of the database is incorrect for the given query. +type ErrSchemaError struct { + DbError error +} + +// Unwrap returns the wrapped error. +func (e ErrSchemaError) Unwrap() error { + return e.DbError +} + +// Error returns the error message. +func (e ErrSchemaError) Error() string { + return e.DbError.Error() +} + +// IsSchemaError returns true if the given error is a schema error. +func IsSchemaError(err error) bool { + var schemaError *ErrSchemaError + return errors.As(err, &schemaError) +} diff --git a/sqldb/v2/sqlerrors_no_sqlite.go b/sqldb/v2/sqlerrors_no_sqlite.go new file mode 100644 index 0000000000..ae79b7f87f --- /dev/null +++ b/sqldb/v2/sqlerrors_no_sqlite.go @@ -0,0 +1,86 @@ +//go:build js || (windows && (arm || 386)) || (linux && (ppc64 || mips || mipsle || mips64)) + +package sqldb + +import ( + "errors" + "fmt" + + "github.com/jackc/pgconn" + "github.com/jackc/pgerrcode" +) + +var ( + // ErrTxRetriesExceeded is returned when a transaction is retried more + // than the max allowed valued without a success. + ErrTxRetriesExceeded = errors.New("db tx retries exceeded") +) + +// MapSQLError attempts to interpret a given error as a database agnostic SQL +// error. +func MapSQLError(err error) error { + // Attempt to interpret the error as a postgres error. + var pqErr *pgconn.PgError + if errors.As(err, &pqErr) { + return parsePostgresError(pqErr) + } + + // Return original error if it could not be classified as a database + // specific error. + return err +} + +// parsePostgresError attempts to parse a postgres error as a database agnostic +// SQL error. +func parsePostgresError(pqErr *pgconn.PgError) error { + switch pqErr.Code { + // Handle unique constraint violation error. + case pgerrcode.UniqueViolation: + return &ErrSQLUniqueConstraintViolation{ + DBError: pqErr, + } + + // Unable to serialize the transaction, so we'll need to try again. + case pgerrcode.SerializationFailure: + return &ErrSerializationError{ + DBError: pqErr, + } + + default: + return fmt.Errorf("unknown postgres error: %w", pqErr) + } +} + +// ErrSQLUniqueConstraintViolation is an error type which represents a database +// agnostic SQL unique constraint violation. +type ErrSQLUniqueConstraintViolation struct { + DBError error +} + +func (e ErrSQLUniqueConstraintViolation) Error() string { + return fmt.Sprintf("sql unique constraint violation: %v", e.DBError) +} + +// ErrSerializationError is an error type which represents a database agnostic +// error that a transaction couldn't be serialized with other concurrent db +// transactions. +type ErrSerializationError struct { + DBError error +} + +// Unwrap returns the wrapped error. +func (e ErrSerializationError) Unwrap() error { + return e.DBError +} + +// Error returns the error message. +func (e ErrSerializationError) Error() string { + return e.DBError.Error() +} + +// IsSerializationError returns true if the given error is a serialization +// error. +func IsSerializationError(err error) bool { + var serializationError *ErrSerializationError + return errors.As(err, &serializationError) +} diff --git a/sqldb/v2/sqlite.go b/sqldb/v2/sqlite.go new file mode 100644 index 0000000000..b988a728b1 --- /dev/null +++ b/sqldb/v2/sqlite.go @@ -0,0 +1,327 @@ +//go:build !js && !(windows && (arm || 386)) && !(linux && (ppc64 || mips || mipsle || mips64)) + +package sqldb + +import ( + "database/sql" + "fmt" + "net/url" + "time" + + "github.com/golang-migrate/migrate/v4" + sqlite_migrate "github.com/golang-migrate/migrate/v4/database/sqlite" + "github.com/lightningnetwork/lnd/fn/v2" + _ "modernc.org/sqlite" // Register relevant drivers. +) + +const ( + // sqliteOptionPrefix is the string prefix sqlite uses to set various + // options. This is used in the following format: + // * sqliteOptionPrefix || option_name = option_value. + sqliteOptionPrefix = "_pragma" + + // sqliteTxLockImmediate is a dsn option used to ensure that write + // transactions are started immediately. + sqliteTxLockImmediate = "_txlock=immediate" +) + +var ( + // sqliteSchemaReplacements maps schema strings to their SQLite + // compatible replacements. Currently, no replacements are needed as our + // SQL schema definition files are designed for SQLite compatibility. + sqliteSchemaReplacements = map[string]string{} + + // Make sure SqliteStore implements the MigrationExecutor interface. + _ MigrationExecutor = (*SqliteStore)(nil) + + // Make sure SqliteStore implements the DB interface. + _ DB = (*SqliteStore)(nil) +) + +// pragmaOption holds a key-value pair for a SQLite pragma setting. +type pragmaOption struct { + name string + value string +} + +// SqliteStore is a database store implementation that uses a sqlite backend. +type SqliteStore struct { + DbPath string + + Config *SqliteConfig + + *BaseDB +} + +// NewSqliteStore attempts to open a new sqlite database based on the passed +// config. +func NewSqliteStore(cfg *SqliteConfig, dbPath string) (*SqliteStore, error) { + // The set of pragma options are accepted using query options. For now + // we only want to ensure that foreign key constraints are properly + // enforced. + pragmaOptions := []pragmaOption{ + { + name: "foreign_keys", + value: "on", + }, + { + name: "journal_mode", + value: "WAL", + }, + { + name: "busy_timeout", + value: fmt.Sprintf("%d", cfg.busyTimeoutMs()), + }, + { + // With the WAL mode, this ensures that we also do an + // extra WAL sync after each transaction. The normal + // sync mode skips this and gives better performance, + // but risks durability. + name: "synchronous", + value: "full", + }, + { + // This is used to ensure proper durability for users + // running on Mac OS. It uses the correct fsync system + // call to ensure items are fully flushed to disk. + name: "fullfsync", + value: "true", + }, + { + name: "auto_vacuum", + value: "incremental", + }, + } + sqliteOptions := make(url.Values) + for _, option := range pragmaOptions { + sqliteOptions.Add( + sqliteOptionPrefix, + fmt.Sprintf("%v=%v", option.name, option.value), + ) + } + + // Then we add any user specified pragma options. Note that these can + // be of the form: "key=value", "key(N)" or "key". + for _, option := range cfg.PragmaOptions { + sqliteOptions.Add(sqliteOptionPrefix, option) + } + + // Construct the DSN which is just the database file name, appended + // with the series of pragma options as a query URL string. For more + // details on the formatting here, see the modernc.org/sqlite docs: + // https://pkg.go.dev/modernc.org/sqlite#Driver.Open. + dsn := fmt.Sprintf( + "%v?%v&%v", dbPath, sqliteOptions.Encode(), + sqliteTxLockImmediate, + ) + db, err := sql.Open("sqlite", dsn) + if err != nil { + return nil, err + } + + // Create the migration tracker table before starting migrations to + // ensure it can be used to track migration progress. Note that a + // corresponding SQLC migration also creates this table, making this + // operation a no-op in that context. Its purpose is to ensure + // compatibility with SQLC query generation. + migrationTrackerSQL := ` + CREATE TABLE IF NOT EXISTS migration_tracker ( + version INTEGER UNIQUE NOT NULL, + migration_time TIMESTAMP NOT NULL + );` + + _, err = db.Exec(migrationTrackerSQL) + if err != nil { + return nil, fmt.Errorf("error creating migration tracker: %w", + err) + } + + maxConns := defaultMaxConns + if cfg.MaxConnections > 0 { + maxConns = cfg.MaxConnections + } + + maxIdleConns := defaultMaxIdleConns + if cfg.MaxIdleConnections > 0 { + maxIdleConns = cfg.MaxIdleConnections + } + + connMaxLifetime := defaultConnMaxLifetime + if cfg.ConnMaxLifetime > 0 { + connMaxLifetime = cfg.ConnMaxLifetime + } + + db.SetMaxOpenConns(maxConns) + db.SetMaxIdleConns(maxIdleConns) + db.SetConnMaxLifetime(connMaxLifetime) + + s := &SqliteStore{ + Config: cfg, + DbPath: dbPath, + BaseDB: &BaseDB{ + DB: db, + BackendType: BackendTypeSqlite, + SkipMigrations: cfg.SkipMigrations, + }, + } + + return s, nil +} + +// GetBaseDB returns the underlying BaseDB instance for the SQLite store. +// It is a trivial helper method to comply with the sqldb.DB interface. +func (s *SqliteStore) GetBaseDB() *BaseDB { + return s.BaseDB +} + +func errSqliteMigration(err error) error { + return fmt.Errorf("error creating sqlite migration: %w", err) +} + +// backupSqliteDatabase creates a backup of the given SQLite database. +func backupSqliteDatabase(srcDB *sql.DB, dbFullFilePath string) error { + if srcDB == nil { + return fmt.Errorf("backup source database is nil") + } + + // Create a database backup file full path from the given source + // database full file path. + // + // Get the current time and format it as a Unix timestamp in + // nanoseconds. + timestamp := time.Now().UnixNano() + + // Add the timestamp to the backup name. + backupFullFilePath := fmt.Sprintf( + "%s.%d.backup", dbFullFilePath, timestamp, + ) + + log.Infof("Creating backup of database file: %v -> %v", + dbFullFilePath, backupFullFilePath) + + // Create the database backup. + vacuumIntoQuery := "VACUUM INTO ?;" + stmt, err := srcDB.Prepare(vacuumIntoQuery) + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(backupFullFilePath) + if err != nil { + return err + } + + return nil +} + +// backupAndMigrate is a helper function that creates a database backup before +// initiating the migration, and then migrates the database to the latest +// version. +func (s *SqliteStore) backupAndMigrate(mig *migrate.Migrate, + currentDbVersion int, maxMigrationVersion uint) error { + + // Determine if a database migration is necessary given the current + // database version and the maximum migration version. + versionUpgradePending := currentDbVersion < int(maxMigrationVersion) + if !versionUpgradePending { + log.Infof("Current database version is up-to-date, skipping "+ + "migration attempt and backup creation "+ + "(current_db_version=%v, max_migration_version=%v)", + currentDbVersion, maxMigrationVersion) + return nil + } + + // At this point, we know that a database migration is necessary. + // Create a backup of the database before starting the migration. + if !s.Config.SkipMigrationDbBackup { + log.Infof("Creating database backup (before applying " + + "migration(s))") + + err := backupSqliteDatabase(s.DB, s.DbPath) + if err != nil { + return err + } + } else { + log.Infof("Skipping database backup creation before applying " + + "migration(s)") + } + + log.Infof("Applying migrations to database") + return mig.Up() +} + +// ExecuteMigrations runs migrations for the sqlite database using the default +// production migration target. +func (s *SqliteStore) ExecuteMigrations(set MigrationSet) error { + if s.Config.SkipMigrations { + return nil + } + + return s.executeMigrations(s.backupAndMigrate, set) +} + +// executeMigrations runs migrations for the sqlite database, depending on the +// target given, either all migrations or up to a given version. +func (s *SqliteStore) executeMigrations(target MigrationTarget, + set MigrationSet) error { + + driver, err := sqlite_migrate.WithInstance( + s.DB, &sqlite_migrate.Config{ + MigrationsTable: set.TrackingTableName, + }, + ) + if err != nil { + return errSqliteMigration(err) + } + + opts := &migrateOptions{ + latestVersion: fn.Some(set.LatestMigrationVersion), + } + + if set.MakeProgrammaticMigrations != nil { + postMigSteps, err := set.MakeProgrammaticMigrations(s.BaseDB) + if err != nil { + return errPostgresMigration(err) + } + opts.programmaticMigrs = postMigSteps + } + + // Populate the database with our set of schemas based on our embedded + // in-memory file system. + sqliteFS := newReplacerFS(set.SQLFiles, sqliteSchemaReplacements) + return applyMigrations( + sqliteFS, driver, set.SQLFileDirectory, "sqlite", target, opts, + ) +} + +// GetSchemaVersion returns the current schema version of the SQLite database. +func (s *SqliteStore) GetSchemaVersion() (int, bool, error) { + driver, err := sqlite_migrate.WithInstance( + s.DB, &sqlite_migrate.Config{}, + ) + if err != nil { + return 0, false, errSqliteMigration(err) + } + + version, dirty, err := driver.Version() + if err != nil { + return 0, dirty, err + } + + return version, dirty, nil +} + +// SetSchemaVersion sets the schema version of the SQLite database. +// +// NOTE: This alters the internal database schema tracker. USE WITH CAUTION!!! +func (s *SqliteStore) SetSchemaVersion(version int, dirty bool) error { + driver, err := sqlite_migrate.WithInstance( + s.DB, &sqlite_migrate.Config{}, + ) + if err != nil { + return errSqliteMigration(err) + } + + return driver.SetVersion(version, dirty) +} diff --git a/sqldb/v2/sqlite_test_utils.go b/sqldb/v2/sqlite_test_utils.go new file mode 100644 index 0000000000..03ef833fe3 --- /dev/null +++ b/sqldb/v2/sqlite_test_utils.go @@ -0,0 +1,86 @@ +//go:build !js && !(windows && (arm || 386)) && !(linux && (ppc64 || mips || mipsle || mips64)) + +package sqldb + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + _ "modernc.org/sqlite" // Register relevant drivers. +) + +// NewTestSqliteDB is a helper function that creates an SQLite database for +// testing. +func NewTestSqliteDB(t testing.TB, sets []MigrationSet) *SqliteStore { + t.Helper() + + t.Logf("Creating new SQLite DB for testing") + + // TODO(roasbeef): if we pass :memory: for the file name, then we get + // an in mem version to speed up tests + dbFileName := filepath.Join(t.TempDir(), "tmp.db") + sqlDB, err := NewSqliteStore(&SqliteConfig{ + SkipMigrations: false, + }, dbFileName) + require.NoError(t, err) + + require.NoError(t, ApplyAllMigrations(sqlDB, sets)) + + t.Cleanup(func() { + require.NoError(t, sqlDB.DB.Close()) + }) + + return sqlDB +} + +// NewTestSqliteDBFromPath is a helper function that creates a SQLite database +// for testing from a given database file path. +func NewTestSqliteDBFromPath(t *testing.T, dbPath string, + sets []MigrationSet) *SqliteStore { + + t.Helper() + + t.Logf("Creating new SQLite DB for testing, using DB path %s", dbPath) + + sqlDB, err := NewSqliteStore(&SqliteConfig{ + SkipMigrations: false, + }, dbPath) + require.NoError(t, err) + + require.NoError(t, ApplyAllMigrations(sqlDB, sets)) + + t.Cleanup(func() { + require.NoError(t, sqlDB.DB.Close()) + }) + + return sqlDB +} + +// NewTestSqliteDBWithVersion is a helper function that creates an SQLite +// database for testing and migrates it to the given version. +func NewTestSqliteDBWithVersion(t *testing.T, set MigrationSet, + version uint) *SqliteStore { + + t.Helper() + + t.Logf("Creating new SQLite DB for testing, migrating to version %d", + version) + + // TODO(roasbeef): if we pass :memory: for the file name, then we get + // an in mem version to speed up tests + dbFileName := filepath.Join(t.TempDir(), "tmp.db") + sqlDB, err := NewSqliteStore(&SqliteConfig{ + SkipMigrations: true, + }, dbFileName) + require.NoError(t, err) + + err = sqlDB.executeMigrations(TargetVersion(version), set) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, sqlDB.DB.Close()) + }) + + return sqlDB +} diff --git a/sqldb/v2/sqlutils.go b/sqldb/v2/sqlutils.go new file mode 100644 index 0000000000..c4bcd0e8ce --- /dev/null +++ b/sqldb/v2/sqlutils.go @@ -0,0 +1,192 @@ +package sqldb + +import ( + "database/sql" + "time" + + "github.com/lightningnetwork/lnd/fn/v2" + "golang.org/x/exp/constraints" +) + +var ( + // MaxValidSQLTime is the maximum valid time that can be rendered as a + // time string and can be used for comparisons in SQL. + MaxValidSQLTime = time.Date(9999, 12, 31, 23, 59, 59, 999999, time.UTC) +) + +// NoOpReset is a no-op function that can be used as a default +// reset function ExecTx calls. +var NoOpReset = func() {} + +// SQLInt16 turns a numerical integer type into the NullInt16 that sql/sqlc +// uses when an integer field can be permitted to be NULL. +// +// We use this constraints.Integer constraint here which maps to all signed and +// unsigned integer types. +func SQLInt16[T constraints.Integer](num T) sql.NullInt16 { + return sql.NullInt16{ + Int16: int16(num), + Valid: true, + } +} + +// SQLInt32 turns a numerical integer type into the NullInt32 that sql/sqlc +// uses when an integer field can be permitted to be NULL. +// +// We use this constraints.Integer constraint here which maps to all signed and +// unsigned integer types. +func SQLInt32[T constraints.Integer](num T) sql.NullInt32 { + return sql.NullInt32{ + Int32: int32(num), + Valid: true, + } +} + +// SQLPtrInt32 turns a pointer to a numerical integer type into the NullInt32 +// that sql/sqlc uses. +func SQLPtrInt32[T constraints.Integer](num *T) sql.NullInt32 { + if num == nil { + return sql.NullInt32{} + } + return sql.NullInt32{ + Int32: int32(*num), + Valid: true, + } +} + +// SqlOptInt32 turns an option of a numerical integer type into the NullInt32 +// that sql/sqlc uses when an integer field can be permitted to be NULL. +func SqlOptInt32[T constraints.Integer](num fn.Option[T]) sql.NullInt32 { + return fn.MapOptionZ(num, func(num T) sql.NullInt32 { + return sql.NullInt32{ + Int32: int32(num), + Valid: true, + } + }) +} + +// SQLInt64 turns a numerical integer type into the NullInt64 that sql/sqlc +// uses when an integer field can be permitted to be NULL. +// +// We use this constraints.Integer constraint here which maps to all signed and +// unsigned integer types. +func SQLInt64[T constraints.Integer](num T) sql.NullInt64 { + return sql.NullInt64{ + Int64: int64(num), + Valid: true, + } +} + +// SQLPtrInt64 turns a pointer to a numerical integer type into the NullInt64 +// that sql/sqlc uses. +func SQLPtrInt64[T constraints.Integer](num *T) sql.NullInt64 { + if num == nil { + return sql.NullInt64{} + } + return sql.NullInt64{ + Int64: int64(*num), + Valid: true, + } +} + +// SqlBool turns a boolean into the NullBool that sql/sqlc uses when a boolean +// field can be permitted to be NULL. +func SqlBool(b bool) sql.NullBool { + return sql.NullBool{ + Bool: b, + Valid: true, + } +} + +// SQLStr turns a string into the NullString that sql/sqlc uses when a string +// can be permitted to be NULL. +// +// NOTE: If the input string is empty, it returns a NullString with Valid set to +// false. If this is not the desired behavior, consider using SQLStrValid +// instead. +func SQLStr(s string) sql.NullString { + if s == "" { + return sql.NullString{} + } + + return sql.NullString{ + String: s, + Valid: true, + } +} + +// SQLStrValid turns a string into the NullString that sql/sqlc uses when a +// string can be permitted to be NULL. +// +// NOTE: Valid is always set to true, even if the input string is empty. +func SQLStrValid(s string) sql.NullString { + return sql.NullString{ + String: s, + Valid: true, + } +} + +// SQLTime turns a time.Time into the NullTime that sql/sqlc uses when a time +// can be permitted to be NULL. +func SQLTime(t time.Time) sql.NullTime { + return sql.NullTime{ + Time: t, + Valid: true, + } +} + +// ExtractSqlInt64 turns a NullInt64 into a numerical type. This can be useful +// when reading directly from the database, as this function handles extracting +// the inner value from the "option"-like struct. +func ExtractSqlInt64[T constraints.Integer](num sql.NullInt64) T { + return T(num.Int64) +} + +// ExtractSqlInt64Ptr turns a NullInt64 into a pointer to a numerical type. +func ExtractSqlInt64Ptr[T constraints.Integer](num sql.NullInt64) *T { + if !num.Valid { + return nil + } + val := T(num.Int64) + return &val +} + +// ExtractSqlInt32 turns a NullInt32 into a numerical type. This can be useful +// when reading directly from the database, as this function handles extracting +// the inner value from the "option"-like struct. +func ExtractSqlInt32[T constraints.Integer](num sql.NullInt32) T { + return T(num.Int32) +} + +// ExtractSqlInt32Ptr turns a NullInt32 into a pointer to a numerical type. +func ExtractSqlInt32Ptr[T constraints.Integer](num sql.NullInt32) *T { + if !num.Valid { + return nil + } + val := T(num.Int32) + return &val +} + +// ExtractOptSqlInt32 turns a NullInt32 into an option of a numerical type. +func ExtractOptSqlInt32[T constraints.Integer](num sql.NullInt32) fn.Option[T] { + if !num.Valid { + return fn.None[T]() + } + + result := T(num.Int32) + return fn.Some(result) +} + +// ExtractSqlInt16 turns a NullInt16 into a numerical type. This can be useful +// when reading directly from the database, as this function handles extracting +// the inner value from the "option"-like struct. +func ExtractSqlInt16[T constraints.Integer](num sql.NullInt16) T { + return T(num.Int16) +} + +// ExtractBool turns a NullBool into a boolean. This can be useful when reading +// directly from the database, as this function handles extracting the inner +// value from the "option"-like struct. +func ExtractBool(b sql.NullBool) bool { + return b.Bool +} diff --git a/sqldb/v2/test_postgres.go b/sqldb/v2/test_postgres.go new file mode 100644 index 0000000000..6557512813 --- /dev/null +++ b/sqldb/v2/test_postgres.go @@ -0,0 +1,30 @@ +//go:build test_db_postgres + +package sqldb + +import ( + "testing" +) + +// NewTestDB is a helper function that creates a Postgres database for testing. +func NewTestDB(t *testing.T, sets []MigrationSet) *PostgresStore { + pgFixture := NewTestPgFixture(t, DefaultPostgresFixtureLifetime) + t.Cleanup(func() { + pgFixture.TearDown(t) + }) + + return NewTestPostgresDB(t, pgFixture, sets) +} + +// NewTestDBWithVersion is a helper function that creates a Postgres database +// for testing and migrates it to the given version. +func NewTestDBWithVersion(t *testing.T, version uint, + set MigrationSet) *PostgresStore { + + pgFixture := NewTestPgFixture(t, DefaultPostgresFixtureLifetime) + t.Cleanup(func() { + pgFixture.TearDown(t) + }) + + return NewTestPostgresDBWithVersion(t, pgFixture, set, version) +} diff --git a/sqldb/v2/test_sqlite.go b/sqldb/v2/test_sqlite.go new file mode 100644 index 0000000000..6105080905 --- /dev/null +++ b/sqldb/v2/test_sqlite.go @@ -0,0 +1,20 @@ +//go:build !test_db_postgres + +package sqldb + +import ( + "testing" +) + +// NewTestDB is a helper function that creates an SQLite database for testing. +func NewTestDB(t *testing.T, sets []MigrationSet) *SqliteStore { + return NewTestSqliteDB(t, sets) +} + +// NewTestDBWithVersion is a helper function that creates an SQLite database +// for testing and migrates it to the given version. +func NewTestDBWithVersion(t *testing.T, set MigrationSet, + version uint) *SqliteStore { + + return NewTestSqliteDBWithVersion(t, set, version) +}