From 39a490cd579bd422215dc45a5129fd085cd25142 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 25 Oct 2023 14:01:36 +0800 Subject: [PATCH 1/6] sweep: rename `NotifyPublishTx` to `StoreTx` To properly reflect what the method really does. We also changes the method signature so only a hash is used. --- sweep/store.go | 12 +++++------- sweep/store_mock.go | 8 +++----- sweep/store_test.go | 4 ++-- sweep/sweeper.go | 4 ++-- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/sweep/store.go b/sweep/store.go index 916d2fa54fd..72d7853a4e4 100644 --- a/sweep/store.go +++ b/sweep/store.go @@ -39,8 +39,8 @@ type SweeperStore interface { // hash. IsOurTx(hash chainhash.Hash) (bool, error) - // NotifyPublishTx signals that we are about to publish a tx. - NotifyPublishTx(*wire.MsgTx) error + // StoreTx stores a tx hash we are about to publish. + StoreTx(chainhash.Hash) error // ListSweeps lists all the sweeps we have successfully published. ListSweeps() ([]chainhash.Hash, error) @@ -147,8 +147,8 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket, return nil } -// NotifyPublishTx signals that we are about to publish a tx. -func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error { +// StoreTx stores that we are about to publish a tx. +func (s *sweeperStore) StoreTx(txid chainhash.Hash) error { return kvdb.Update(s.db, func(tx kvdb.RwTx) error { txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey) @@ -156,9 +156,7 @@ func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error { return errNoTxHashesBucket } - hash := sweepTx.TxHash() - - return txHashesBucket.Put(hash[:], []byte{}) + return txHashesBucket.Put(txid[:], []byte{}) }, func() {}) } diff --git a/sweep/store_mock.go b/sweep/store_mock.go index 53d9080d8b3..c8b9652f446 100644 --- a/sweep/store_mock.go +++ b/sweep/store_mock.go @@ -2,7 +2,6 @@ package sweep import ( "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" ) // MockSweeperStore is a mock implementation of sweeper store. This type is @@ -25,10 +24,9 @@ func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { return ok, nil } -// NotifyPublishTx signals that we are about to publish a tx. -func (s *MockSweeperStore) NotifyPublishTx(tx *wire.MsgTx) error { - txHash := tx.TxHash() - s.ourTxes[txHash] = struct{}{} +// StoreTx stores a tx we are about to publish. +func (s *MockSweeperStore) StoreTx(txid chainhash.Hash) error { + s.ourTxes[txid] = struct{}{} return nil } diff --git a/sweep/store_test.go b/sweep/store_test.go index 60e66b4b042..c07583dcce5 100644 --- a/sweep/store_test.go +++ b/sweep/store_test.go @@ -50,7 +50,7 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { }, }) - err = store.NotifyPublishTx(&tx1) + err = store.StoreTx(tx1.TxHash()) if err != nil { t.Fatal(err) } @@ -63,7 +63,7 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { }, }) - err = store.NotifyPublishTx(&tx2) + err = store.StoreTx(tx2.TxHash()) if err != nil { t.Fatal(err) } diff --git a/sweep/sweeper.go b/sweep/sweeper.go index ebdce7d1bc4..99e8091445c 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1176,9 +1176,9 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, // publish, we loose track of this tx. Even republication on startup // doesn't prevent this, because that call returns a double spend error // then and would also not add the hash to the store. - err = s.cfg.Store.NotifyPublishTx(tx) + err = s.cfg.Store.StoreTx(tx.TxHash()) if err != nil { - return fmt.Errorf("notify publish tx: %w", err) + return fmt.Errorf("store tx: %w", err) } // Reschedule the inputs that we just tried to sweep. This is done in From d51853c25b418d575166001b8359fdbffcabdab2 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 25 Oct 2023 14:13:23 +0800 Subject: [PATCH 2/6] sweep: return fees from method `createSweepTx` Which will be used to make the sweeper RBF-aware. --- sweep/sweeper.go | 6 ++++-- sweep/txgenerator.go | 33 +++++++++++++++++++++++---------- sweep/walletsweep.go | 2 +- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 99e8091445c..c1981eed2a1 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1163,7 +1163,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, } // Create sweep tx. - tx, err := createSweepTx( + tx, _, err := createSweepTx( inputs, nil, s.currentOutputScript, uint32(currentHeight), feeRate, s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer, ) @@ -1467,10 +1467,12 @@ func (s *UtxoSweeper) CreateSweepTx(inputs []input.Input, feePref FeePreference, return nil, err } - return createSweepTx( + tx, _, err := createSweepTx( inputs, nil, pkScript, currentBlockHeight, feePerKw, s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer, ) + + return tx, err } // DefaultNextAttemptDeltaFunc is the default calculation for next sweep attempt diff --git a/sweep/txgenerator.go b/sweep/txgenerator.go index 45341cc8c5d..2fae5b88674 100644 --- a/sweep/txgenerator.go +++ b/sweep/txgenerator.go @@ -20,6 +20,14 @@ var ( // allowed in a single sweep tx. If more need to be swept, multiple txes // are created and published. DefaultMaxInputsPerTx = 100 + + // ErrLocktimeConflict is returned when inputs with different + // transaction nLockTime values are included in the same transaction. + // + // NOTE: due the SINGLE|ANYONECANPAY sighash flag, which is used in the + // second level success/timeout txns, only the txns sharing the same + // nLockTime can exist in the same tx. + ErrLocktimeConflict = errors.New("incompatible locktime") ) // txInput is an interface that provides the input data required for tx @@ -140,13 +148,13 @@ func generateInputPartitionings(sweepableInputs []txInput, func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, changePkScript []byte, currentBlockHeight uint32, feePerKw, maxFeeRate chainfee.SatPerKWeight, - signer input.Signer) (*wire.MsgTx, error) { + signer input.Signer) (*wire.MsgTx, btcutil.Amount, error) { inputs, estimator, err := getWeightEstimate( inputs, outputs, feePerKw, maxFeeRate, changePkScript, ) if err != nil { - return nil, err + return nil, 0, err } txFee := estimator.fee() @@ -188,7 +196,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, // If another input commits to a different locktime, // they cannot be combined in the same transaction. if locktime != -1 && locktime != int32(lt) { - return nil, fmt.Errorf("incompatible locktime") + return nil, 0, ErrLocktimeConflict } locktime = int32(lt) @@ -213,7 +221,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, if lt, ok := o.RequiredLockTime(); ok { if locktime != -1 && locktime != int32(lt) { - return nil, fmt.Errorf("incompatible locktime") + return nil, 0, ErrLocktimeConflict } locktime = int32(lt) @@ -229,7 +237,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, } if requiredOutput+txFee > totalInput { - return nil, fmt.Errorf("insufficient input to create sweep "+ + return nil, 0, fmt.Errorf("insufficient input to create sweep "+ "tx: input_sum=%v, output_sum=%v", totalInput, requiredOutput+txFee) } @@ -253,6 +261,10 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, } else { log.Infof("Change amt %v below dustlimit %v, not adding "+ "change output", changeAmt, changeLimit) + + // The dust amount is added to the fee as the miner will + // collect it. + txFee += changeAmt } // We'll default to using the current block height as locktime, if none @@ -270,12 +282,12 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, // classes if fees are too low. btx := btcutil.NewTx(sweepTx) if err := blockchain.CheckTransactionSanity(btx); err != nil { - return nil, err + return nil, 0, err } prevInputFetcher, err := input.MultiPrevOutFetcher(inputs) if err != nil { - return nil, fmt.Errorf("error creating prev input fetcher "+ + return nil, 0, fmt.Errorf("error creating prev input fetcher "+ "for hash cache: %v", err) } hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher) @@ -293,7 +305,8 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, sweepTx.TxIn[idx].Witness = inputScript.Witness if len(inputScript.SigScript) != 0 { - sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript + sweepTx.TxIn[idx].SignatureScript = + inputScript.SigScript } return nil @@ -301,7 +314,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, for idx, inp := range idxs { if err := addInputScript(idx, inp); err != nil { - return nil, err + return nil, 0, err } } @@ -315,7 +328,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, estimator.parentsWeight, ) - return sweepTx, nil + return sweepTx, txFee, nil } // getWeightEstimate returns a weight estimate for the given inputs. diff --git a/sweep/walletsweep.go b/sweep/walletsweep.go index 831b0151e48..d3c5365c0a8 100644 --- a/sweep/walletsweep.go +++ b/sweep/walletsweep.go @@ -319,7 +319,7 @@ func CraftSweepAllTx(feeRate, maxFeeRate chainfee.SatPerKWeight, // Finally, we'll ask the sweeper to craft a sweep transaction which // respects our fee preference and targets all the UTXOs of the wallet. - sweepTx, err := createSweepTx( + sweepTx, _, err := createSweepTx( inputsToSweep, txOuts, changePkScript, blockHeight, feeRate, maxFeeRate, signer, ) From c206ed3d98d59b17c384dcf055a7afaebdd5d305 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 25 Oct 2023 18:45:25 +0800 Subject: [PATCH 3/6] sweep: expand sweeper store to also save RBF-related info This commit modifies the sweeper store to save a `TxRecord` in db instead of an empty byte slice. This record will later be used to bring RBF-awareness to our sweeper. --- sweep/store.go | 121 +++++++++++++++++++++++++++++++++++++++++--- sweep/store_mock.go | 4 +- sweep/store_test.go | 79 +++++++++++++++++------------ sweep/sweeper.go | 20 +++++++- 4 files changed, 182 insertions(+), 42 deletions(-) diff --git a/sweep/store.go b/sweep/store.go index 72d7853a4e4..375436ca3fd 100644 --- a/sweep/store.go +++ b/sweep/store.go @@ -4,17 +4,19 @@ import ( "bytes" "encoding/binary" "errors" + "io" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/tlv" ) var ( // txHashesBucketKey is the key that points to a bucket containing the // hashes of all sweep txes that were published successfully. // - // maps: txHash -> empty slice + // maps: txHash -> TxRecord txHashesBucketKey = []byte("sweeper-tx-hashes") // utxnChainPrefix is the bucket prefix for nursery buckets. @@ -31,8 +33,90 @@ var ( byteOrder = binary.BigEndian errNoTxHashesBucket = errors.New("tx hashes bucket does not exist") + + // ErrTxNotFound is returned when querying using a txid that's not + // found in our db. + ErrTxNotFound = errors.New("tx not found") ) +// TxRecord specifies a record of a tx that's stored in the database. +type TxRecord struct { + // Txid is the sweeping tx's txid, which is used as the key to store + // the following values. + Txid chainhash.Hash + + // FeeRate is the fee rate of the sweeping tx, unit is sats/kw. + FeeRate uint64 + + // Fee is the fee of the sweeping tx, unit is sat. + Fee uint64 + + // Published indicates whether the tx has been published. + Published bool +} + +// toTlvStream converts TxRecord into a tlv representation. +func (t *TxRecord) toTlvStream() (*tlv.Stream, error) { + const ( + // A set of tlv type definitions used to serialize TxRecord. + // We define it here instead of the head of the file to avoid + // naming conflicts. + // + // NOTE: A migration should be added whenever the existing type + // changes. + // + // NOTE: Txid is stored as the key, so it's not included here. + feeRateType tlv.Type = 0 + feeType tlv.Type = 1 + boolType tlv.Type = 2 + ) + + return tlv.NewStream( + tlv.MakeBigSizeRecord(feeRateType, &t.FeeRate), + tlv.MakeBigSizeRecord(feeType, &t.Fee), + tlv.MakePrimitiveRecord(boolType, &t.Published), + ) +} + +// serializeTxRecord serializes a TxRecord based on tlv format. +func serializeTxRecord(w io.Writer, tx *TxRecord) error { + // Create the tlv stream. + tlvStream, err := tx.toTlvStream() + if err != nil { + return err + } + + // Encode the tlv stream. + var buf bytes.Buffer + if err := tlvStream.Encode(&buf); err != nil { + return err + } + + // Write the tlv stream. + if _, err = w.Write(buf.Bytes()); err != nil { + return err + } + + return nil +} + +// deserializeTxRecord deserializes a TxRecord based on tlv format. +func deserializeTxRecord(r io.Reader) (*TxRecord, error) { + var tx TxRecord + + // Create the tlv stream. + tlvStream, err := tx.toTlvStream() + if err != nil { + return nil, err + } + + if err := tlvStream.Decode(r); err != nil { + return nil, err + } + + return &tx, nil +} + // SweeperStore stores published txes. type SweeperStore interface { // IsOurTx determines whether a tx is published by us, based on its @@ -40,7 +124,7 @@ type SweeperStore interface { IsOurTx(hash chainhash.Hash) (bool, error) // StoreTx stores a tx hash we are about to publish. - StoreTx(chainhash.Hash) error + StoreTx(*TxRecord) error // ListSweeps lists all the sweeps we have successfully published. ListSweeps() ([]chainhash.Hash, error) @@ -83,6 +167,8 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) ( // migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This // is not implemented as a database migration, to keep the downgrade path open. +// +// TODO(yy): delete this function once nursery is removed. func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket, chainHash *chainhash.Hash) error { @@ -138,7 +224,24 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket, log.Debugf("Inserting nursery tx %v in hash list "+ "(height=%v)", hash, byteOrder.Uint32(k)) - return txHashesBucket.Put(hash[:], []byte{}) + // Create the transaction record. Since this is an old record, + // we can assume it's already been published. Although it's + // possible to calculate the fees and fee rate used here, we + // skip it as it's unlikely we'd perform RBF on these old + // sweeping transactions. + tr := &TxRecord{ + Txid: hash, + Published: true, + } + + // Serialize tx record. + var b bytes.Buffer + err = serializeTxRecord(&b, tr) + if err != nil { + return err + } + + return txHashesBucket.Put(tr.Txid[:], b.Bytes()) }) if err != nil { return err @@ -148,15 +251,21 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket, } // StoreTx stores that we are about to publish a tx. -func (s *sweeperStore) StoreTx(txid chainhash.Hash) error { +func (s *sweeperStore) StoreTx(tr *TxRecord) error { return kvdb.Update(s.db, func(tx kvdb.RwTx) error { - txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey) if txHashesBucket == nil { return errNoTxHashesBucket } - return txHashesBucket.Put(txid[:], []byte{}) + // Serialize tx record. + var b bytes.Buffer + err := serializeTxRecord(&b, tr) + if err != nil { + return err + } + + return txHashesBucket.Put(tr.Txid[:], b.Bytes()) }, func() {}) } diff --git a/sweep/store_mock.go b/sweep/store_mock.go index c8b9652f446..8263cb3a88e 100644 --- a/sweep/store_mock.go +++ b/sweep/store_mock.go @@ -25,8 +25,8 @@ func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { } // StoreTx stores a tx we are about to publish. -func (s *MockSweeperStore) StoreTx(txid chainhash.Hash) error { - s.ourTxes[txid] = struct{}{} +func (s *MockSweeperStore) StoreTx(tr *TxRecord) error { + s.ourTxes[tr.Txid] = struct{}{} return nil } diff --git a/sweep/store_test.go b/sweep/store_test.go index c07583dcce5..4abf658717d 100644 --- a/sweep/store_test.go +++ b/sweep/store_test.go @@ -1,6 +1,7 @@ package sweep import ( + "bytes" "testing" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -50,11 +51,13 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { }, }) - err = store.StoreTx(tx1.TxHash()) - if err != nil { - t.Fatal(err) + tr1 := &TxRecord{ + Txid: tx1.TxHash(), } + err = store.StoreTx(tr1) + require.NoError(t, err) + // Notify publication of tx2 tx2 := wire.MsgTx{} tx2.AddTxIn(&wire.TxIn{ @@ -63,11 +66,13 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { }, }) - err = store.StoreTx(tx2.TxHash()) - if err != nil { - t.Fatal(err) + tr2 := &TxRecord{ + Txid: tx2.TxHash(), } + err = store.StoreTx(tr2) + require.NoError(t, err) + // Recreate the sweeper store store, err = createStore() if err != nil { @@ -76,30 +81,18 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { // Assert that both txes are recognized as our own. ours, err := store.IsOurTx(tx1.TxHash()) - if err != nil { - t.Fatal(err) - } - if !ours { - t.Fatal("expected tx to be ours") - } + require.NoError(t, err) + require.True(t, ours, "expected tx to be ours") ours, err = store.IsOurTx(tx2.TxHash()) - if err != nil { - t.Fatal(err) - } - if !ours { - t.Fatal("expected tx to be ours") - } + require.NoError(t, err) + require.True(t, ours, "expected tx to be ours") // An different hash should be reported as not being ours. var unknownHash chainhash.Hash ours, err = store.IsOurTx(unknownHash) - if err != nil { - t.Fatal(err) - } - if ours { - t.Fatal("expected tx to be not ours") - } + require.NoError(t, err) + require.False(t, ours, "expected tx to not be ours") txns, err := store.ListSweeps() require.NoError(t, err, "unexpected error") @@ -110,16 +103,38 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { tx1.TxHash(): true, tx2.TxHash(): true, } - - if len(txns) != len(expected) { - t.Fatalf("expected: %v sweeps, got: %v", len(expected), - len(txns)) - } + require.Len(t, txns, len(expected)) for _, tx := range txns { _, ok := expected[tx] - if !ok { - t.Fatalf("unexpected tx: %v", tx) - } + require.Truef(t, ok, "unexpected txid returned: %v", tx) } } + +// TestTxRecord asserts that the serializeTxRecord and deserializeTxRecord +// behave as expected. +func TestTxRecord(t *testing.T) { + t.Parallel() + + // Create a testing record. + // + // NOTE: Txid is omitted because it is not serialized. + tr := &TxRecord{ + FeeRate: 1000, + Fee: 10000, + Published: true, + } + + var b bytes.Buffer + + // Assert we can serialize the record. + err := serializeTxRecord(&b, tr) + require.NoError(t, err) + + // Assert we can deserialize the record. + result, err := deserializeTxRecord(&b) + require.NoError(t, err) + + // Assert the deserialized record is equal to the original. + require.Equal(t, tr, result) +} diff --git a/sweep/sweeper.go b/sweep/sweeper.go index c1981eed2a1..f3920a41483 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1163,7 +1163,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, } // Create sweep tx. - tx, _, err := createSweepTx( + tx, fee, err := createSweepTx( inputs, nil, s.currentOutputScript, uint32(currentHeight), feeRate, s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer, ) @@ -1171,12 +1171,18 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, return fmt.Errorf("create sweep tx: %w", err) } + tr := &TxRecord{ + Txid: tx.TxHash(), + FeeRate: uint64(feeRate), + Fee: uint64(fee), + } + // Add tx before publication, so that we will always know that a spend // by this tx is ours. Otherwise if the publish doesn't return, but did // publish, we loose track of this tx. Even republication on startup // doesn't prevent this, because that call returns a double spend error // then and would also not add the hash to the store. - err = s.cfg.Store.StoreTx(tx.TxHash()) + err = s.cfg.Store.StoreTx(tr) if err != nil { return fmt.Errorf("store tx: %w", err) } @@ -1197,6 +1203,16 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, return err } + // Mark this tx in db once successfully published. + // + // NOTE: this will behave as an overwrite, which is fine as the record + // is small. + tr.Published = true + err = s.cfg.Store.StoreTx(tr) + if err != nil { + return fmt.Errorf("store tx: %w", err) + } + // If there's no error, remove the output script. Otherwise keep it so // that it can be reused for the next transaction and causes no address // inflation. From 6760bdeac32b2d9ce60217c4f87e95e4d2124c39 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 25 Oct 2023 18:47:01 +0800 Subject: [PATCH 4/6] sweep: add new methods `GetTx` and `DeleteTx` to manage `TxRecord` --- sweep/store.go | 68 ++++++++++++++++++++++++++++ sweep/store_mock.go | 11 +++++ sweep/store_test.go | 106 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 185 insertions(+) diff --git a/sweep/store.go b/sweep/store.go index 375436ca3fd..cfab6638198 100644 --- a/sweep/store.go +++ b/sweep/store.go @@ -128,6 +128,13 @@ type SweeperStore interface { // ListSweeps lists all the sweeps we have successfully published. ListSweeps() ([]chainhash.Hash, error) + + // GetTx queries the database to find the tx that matches the given + // txid. Returns ErrTxNotFound if it cannot be found. + GetTx(hash chainhash.Hash) (*TxRecord, error) + + // DeleteTx removes a tx specified by the hash from the store. + DeleteTx(hash chainhash.Hash) error } type sweeperStore struct { @@ -322,5 +329,66 @@ func (s *sweeperStore) ListSweeps() ([]chainhash.Hash, error) { return sweepTxns, nil } +// GetTx queries the database to find the tx that matches the given txid. +// Returns ErrTxNotFound if it cannot be found. +func (s *sweeperStore) GetTx(txid chainhash.Hash) (*TxRecord, error) { + // Create a record. + tr := &TxRecord{} + + var err error + err = kvdb.View(s.db, func(tx kvdb.RTx) error { + txHashesBucket := tx.ReadBucket(txHashesBucketKey) + if txHashesBucket == nil { + return errNoTxHashesBucket + } + + txBytes := txHashesBucket.Get(txid[:]) + if txBytes == nil { + return ErrTxNotFound + } + + // For old records, we'd get an empty byte slice here. We can + // assume it's already been published. Although it's possible + // to calculate the fees and fee rate used here, we skip it as + // it's unlikely we'd perform RBF on these old sweeping + // transactions. + // + // TODO(yy): remove this check once migration is added. + if len(txBytes) == 0 { + tr.Published = true + return nil + } + + tr, err = deserializeTxRecord(bytes.NewReader(txBytes)) + if err != nil { + return err + } + + return nil + }, func() { + tr = &TxRecord{} + }) + if err != nil { + return nil, err + } + + // Attach the txid to the record. + tr.Txid = txid + + return tr, nil +} + +// DeleteTx removes the given tx from db. +func (s *sweeperStore) DeleteTx(txid chainhash.Hash) error { + return kvdb.Update(s.db, func(tx kvdb.RwTx) error { + txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey) + if txHashesBucket == nil { + return errNoTxHashesBucket + } + + return txHashesBucket.Delete(txid[:]) + }, func() {}) +} + // Compile-time constraint to ensure sweeperStore implements SweeperStore. var _ SweeperStore = (*sweeperStore)(nil) diff --git a/sweep/store_mock.go b/sweep/store_mock.go index 8263cb3a88e..16f7714a9b7 100644 --- a/sweep/store_mock.go +++ b/sweep/store_mock.go @@ -41,5 +41,16 @@ func (s *MockSweeperStore) ListSweeps() ([]chainhash.Hash, error) { return txns, nil } +// GetTx queries the database to find the tx that matches the given txid. +// Returns ErrTxNotFound if it cannot be found. +func (s *MockSweeperStore) GetTx(hash chainhash.Hash) (*TxRecord, error) { + return nil, ErrTxNotFound +} + +// DeleteTx removes the given tx from db. +func (s *MockSweeperStore) DeleteTx(txid chainhash.Hash) error { + return nil +} + // Compile-time constraint to ensure MockSweeperStore implements SweeperStore. var _ SweeperStore = (*MockSweeperStore)(nil) diff --git a/sweep/store_test.go b/sweep/store_test.go index 4abf658717d..7cfc649c9d9 100644 --- a/sweep/store_test.go +++ b/sweep/store_test.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" "github.com/stretchr/testify/require" ) @@ -138,3 +139,108 @@ func TestTxRecord(t *testing.T) { // Assert the deserialized record is equal to the original. require.Equal(t, tr, result) } + +// TestGetTx asserts that the GetTx method behaves as expected. +func TestGetTx(t *testing.T) { + t.Parallel() + + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) + + // Create a testing store. + chain := chainhash.Hash{} + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) + + // Create a testing record. + txid := chainhash.Hash{1, 2, 3} + tr := &TxRecord{ + Txid: txid, + FeeRate: 1000, + Fee: 10000, + Published: true, + } + + // Assert we can store this tx record. + err = store.StoreTx(tr) + require.NoError(t, err) + + // Assert we can query the tx record. + result, err := store.GetTx(txid) + require.NoError(t, err) + require.Equal(t, tr, result) + + // Assert we get an error when querying a non-existing tx. + _, err = store.GetTx(chainhash.Hash{4, 5, 6}) + require.ErrorIs(t, ErrTxNotFound, err) +} + +// TestGetTxCompatible asserts that when there's old tx record data in the +// database it can be successfully queried. +func TestGetTxCompatible(t *testing.T) { + t.Parallel() + + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) + + // Create a testing store. + chain := chainhash.Hash{} + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) + + // Create a testing txid. + txid := chainhash.Hash{0, 1, 2, 3} + + // Create a record using the old format "hash -> empty byte slice". + err = kvdb.Update(cdb, func(tx kvdb.RwTx) error { + txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey) + return txHashesBucket.Put(txid[:], []byte{}) + }, func() {}) + require.NoError(t, err) + + // Assert we can query the tx record. + result, err := store.GetTx(txid) + require.NoError(t, err) + require.Equal(t, txid, result.Txid) + + // Assert the Published field is true. + require.True(t, result.Published) +} + +// TestDeleteTx asserts that the DeleteTx method behaves as expected. +func TestDeleteTx(t *testing.T) { + t.Parallel() + + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) + + // Create a testing store. + chain := chainhash.Hash{} + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) + + // Create a testing record. + txid := chainhash.Hash{1, 2, 3} + tr := &TxRecord{ + Txid: txid, + FeeRate: 1000, + Fee: 10000, + Published: true, + } + + // Assert we can store this tx record. + err = store.StoreTx(tr) + require.NoError(t, err) + + // Assert we can delete the tx record. + err = store.DeleteTx(txid) + require.NoError(t, err) + + // Query it again should give us an error. + _, err = store.GetTx(txid) + require.ErrorIs(t, ErrTxNotFound, err) + + // Assert deleting a non-existing tx doesn't return an error. + err = store.DeleteTx(chainhash.Hash{4, 5, 6}) + require.NoError(t, err) +} From 9b4d67a56a4fb501479726785ab9aa39af8be4ee Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 11 Jan 2024 04:18:40 +0800 Subject: [PATCH 5/6] sweep: use `testify/mock` for `MockSweeperStore` --- sweep/store_mock.go | 40 ++++++++++++++++++++++------------------ sweep/store_test.go | 40 ++++++++-------------------------------- sweep/sweeper_test.go | 42 +++++++++++++++++++++++------------------- 3 files changed, 53 insertions(+), 69 deletions(-) diff --git a/sweep/store_mock.go b/sweep/store_mock.go index 16f7714a9b7..73b797963dc 100644 --- a/sweep/store_mock.go +++ b/sweep/store_mock.go @@ -2,54 +2,58 @@ package sweep import ( "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/stretchr/testify/mock" ) // MockSweeperStore is a mock implementation of sweeper store. This type is // exported, because it is currently used in nursery tests too. type MockSweeperStore struct { - ourTxes map[chainhash.Hash]struct{} + mock.Mock } // NewMockSweeperStore returns a new instance. func NewMockSweeperStore() *MockSweeperStore { - return &MockSweeperStore{ - ourTxes: make(map[chainhash.Hash]struct{}), - } + return &MockSweeperStore{} } -// IsOurTx determines whether a tx is published by us, based on its -// hash. +// IsOurTx determines whether a tx is published by us, based on its hash. func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { - _, ok := s.ourTxes[hash] - return ok, nil + args := s.Called(hash) + + return args.Bool(0), args.Error(1) } // StoreTx stores a tx we are about to publish. func (s *MockSweeperStore) StoreTx(tr *TxRecord) error { - s.ourTxes[tr.Txid] = struct{}{} - - return nil + args := s.Called(tr) + return args.Error(0) } // ListSweeps lists all the sweeps we have successfully published. func (s *MockSweeperStore) ListSweeps() ([]chainhash.Hash, error) { - var txns []chainhash.Hash - for tx := range s.ourTxes { - txns = append(txns, tx) - } + args := s.Called() - return txns, nil + return args.Get(0).([]chainhash.Hash), args.Error(1) } // GetTx queries the database to find the tx that matches the given txid. // Returns ErrTxNotFound if it cannot be found. func (s *MockSweeperStore) GetTx(hash chainhash.Hash) (*TxRecord, error) { - return nil, ErrTxNotFound + args := s.Called(hash) + + tr := args.Get(0) + if tr != nil { + return args.Get(0).(*TxRecord), args.Error(1) + } + + return nil, args.Error(1) } // DeleteTx removes the given tx from db. func (s *MockSweeperStore) DeleteTx(txid chainhash.Hash) error { - return nil + args := s.Called(txid) + + return args.Error(0) } // Compile-time constraint to ensure MockSweeperStore implements SweeperStore. diff --git a/sweep/store_test.go b/sweep/store_test.go index 7cfc649c9d9..ea65b017796 100644 --- a/sweep/store_test.go +++ b/sweep/store_test.go @@ -14,35 +14,13 @@ import ( // TestStore asserts that the store persists the presented data to disk and is // able to retrieve it again. func TestStore(t *testing.T) { - t.Run("bolt", func(t *testing.T) { - - // Create new store. - cdb, err := channeldb.MakeTestDB(t) - if err != nil { - t.Fatalf("unable to open channel db: %v", err) - } - - testStore(t, func() (SweeperStore, error) { - var chain chainhash.Hash - return NewSweeperStore(cdb, &chain) - }) - }) - t.Run("mock", func(t *testing.T) { - store := NewMockSweeperStore() - - testStore(t, func() (SweeperStore, error) { - // Return same store, because the mock has no real - // persistence. - return store, nil - }) - }) -} + // Create new store. + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) -func testStore(t *testing.T, createStore func() (SweeperStore, error)) { - store, err := createStore() - if err != nil { - t.Fatal(err) - } + var chain chainhash.Hash + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) // Notify publication of tx1 tx1 := wire.MsgTx{} @@ -75,10 +53,8 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { require.NoError(t, err) // Recreate the sweeper store - store, err = createStore() - if err != nil { - t.Fatal(err) - } + store, err = NewSweeperStore(cdb, &chain) + require.NoError(t, err) // Assert that both txes are recognized as our own. ours, err := store.IsOurTx(tx1.TxHash()) diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 3054c9f0c38..2003254cd60 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lntest/mock" @@ -41,7 +42,7 @@ type sweeperTestContext struct { notifier *MockNotifier estimator *mockFeeEstimator backend *mockBackend - store *MockSweeperStore + store SweeperStore publishChan chan wire.MsgTx } @@ -102,7 +103,13 @@ func init() { func createSweeperTestContext(t *testing.T) *sweeperTestContext { notifier := NewMockNotifier(t) - store := NewMockSweeperStore() + // Create new store. + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) + + var chain chainhash.Hash + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) backend := newMockBackend(t, notifier) backend.walletUtxos = []*lnwallet.Utxo{ @@ -682,7 +689,6 @@ func TestIdempotency(t *testing.T) { // Timer is still running, but spend notification was delivered before // it expired. - ctx.finish(1) } @@ -701,9 +707,8 @@ func TestRestart(t *testing.T) { // Sweep input and expect sweep tx. input1 := spendableInputs[0] - if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil { - t.Fatal(err) - } + _, err := ctx.sweeper.SweepInput(input1, defaultFeePref) + require.NoError(t, err) ctx.receiveTx() @@ -758,23 +763,20 @@ func TestRestart(t *testing.T) { ctx.finish(1) } -// TestRestartRemoteSpend asserts that the sweeper picks up sweeping properly after -// a restart with remote spend. +// TestRestartRemoteSpend asserts that the sweeper picks up sweeping properly +// after a restart with remote spend. func TestRestartRemoteSpend(t *testing.T) { - ctx := createSweeperTestContext(t) // Sweep input. input1 := spendableInputs[0] - if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil { - t.Fatal(err) - } + _, err := ctx.sweeper.SweepInput(input1, defaultFeePref) + require.NoError(t, err) // Sweep another input. input2 := spendableInputs[1] - if _, err := ctx.sweeper.SweepInput(input2, defaultFeePref); err != nil { - t.Fatal(err) - } + _, err = ctx.sweeper.SweepInput(input2, defaultFeePref) + require.NoError(t, err) sweepTx := ctx.receiveTx() @@ -798,7 +800,8 @@ func TestRestartRemoteSpend(t *testing.T) { // Mine remote spending tx. ctx.backend.mine() - // Simulate other subsystem (e.g. contract resolver) re-offering input 0. + // Simulate other subsystem (e.g. contract resolver) re-offering input + // 0. spendChan, err := ctx.sweeper.SweepInput(input1, defaultFeePref) if err != nil { t.Fatal(err) @@ -815,8 +818,8 @@ func TestRestartRemoteSpend(t *testing.T) { ctx.finish(1) } -// TestRestartConfirmed asserts that the sweeper picks up sweeping properly after -// a restart with a confirm of our own sweep tx. +// TestRestartConfirmed asserts that the sweeper picks up sweeping properly +// after a restart with a confirm of our own sweep tx. func TestRestartConfirmed(t *testing.T) { ctx := createSweeperTestContext(t) @@ -834,7 +837,8 @@ func TestRestartConfirmed(t *testing.T) { // Mine the sweep tx. ctx.backend.mine() - // Simulate other subsystem (e.g. contract resolver) re-offering input 0. + // Simulate other subsystem (e.g. contract resolver) re-offering input + // 0. spendChan, err := ctx.sweeper.SweepInput(input, defaultFeePref) if err != nil { t.Fatal(err) From b9e6ef28886a4113d048401f33472184278cce5b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 8 Jan 2024 04:12:50 +0800 Subject: [PATCH 6/6] docs: update release docs --- docs/release-notes/release-notes-0.18.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/release-notes/release-notes-0.18.0.md b/docs/release-notes/release-notes-0.18.0.md index 673514082fa..86e1576f65f 100644 --- a/docs/release-notes/release-notes-0.18.0.md +++ b/docs/release-notes/release-notes-0.18.0.md @@ -352,6 +352,10 @@ for SQL backends enabling new users to optionally use an experimental native SQL invoices database. +* [Expanded SweeperStore](https://github.com/lightningnetwork/lnd/pull/8147) to + also store the feerate, fees paid, and whether it's published or not for a + given sweeping transaction. + ## Code Health * [Remove database pointers](https://github.com/lightningnetwork/lnd/pull/8117)