diff --git a/docs/release-notes/release-notes-0.18.0.md b/docs/release-notes/release-notes-0.18.0.md index 673514082fa..86e1576f65f 100644 --- a/docs/release-notes/release-notes-0.18.0.md +++ b/docs/release-notes/release-notes-0.18.0.md @@ -352,6 +352,10 @@ for SQL backends enabling new users to optionally use an experimental native SQL invoices database. +* [Expanded SweeperStore](https://github.com/lightningnetwork/lnd/pull/8147) to + also store the feerate, fees paid, and whether it's published or not for a + given sweeping transaction. + ## Code Health * [Remove database pointers](https://github.com/lightningnetwork/lnd/pull/8117) diff --git a/sweep/store.go b/sweep/store.go index 916d2fa54fd..cfab6638198 100644 --- a/sweep/store.go +++ b/sweep/store.go @@ -4,17 +4,19 @@ import ( "bytes" "encoding/binary" "errors" + "io" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/tlv" ) var ( // txHashesBucketKey is the key that points to a bucket containing the // hashes of all sweep txes that were published successfully. // - // maps: txHash -> empty slice + // maps: txHash -> TxRecord txHashesBucketKey = []byte("sweeper-tx-hashes") // utxnChainPrefix is the bucket prefix for nursery buckets. @@ -31,19 +33,108 @@ var ( byteOrder = binary.BigEndian errNoTxHashesBucket = errors.New("tx hashes bucket does not exist") + + // ErrTxNotFound is returned when querying using a txid that's not + // found in our db. + ErrTxNotFound = errors.New("tx not found") ) +// TxRecord specifies a record of a tx that's stored in the database. +type TxRecord struct { + // Txid is the sweeping tx's txid, which is used as the key to store + // the following values. + Txid chainhash.Hash + + // FeeRate is the fee rate of the sweeping tx, unit is sats/kw. + FeeRate uint64 + + // Fee is the fee of the sweeping tx, unit is sat. + Fee uint64 + + // Published indicates whether the tx has been published. + Published bool +} + +// toTlvStream converts TxRecord into a tlv representation. +func (t *TxRecord) toTlvStream() (*tlv.Stream, error) { + const ( + // A set of tlv type definitions used to serialize TxRecord. + // We define it here instead of the head of the file to avoid + // naming conflicts. + // + // NOTE: A migration should be added whenever the existing type + // changes. + // + // NOTE: Txid is stored as the key, so it's not included here. + feeRateType tlv.Type = 0 + feeType tlv.Type = 1 + boolType tlv.Type = 2 + ) + + return tlv.NewStream( + tlv.MakeBigSizeRecord(feeRateType, &t.FeeRate), + tlv.MakeBigSizeRecord(feeType, &t.Fee), + tlv.MakePrimitiveRecord(boolType, &t.Published), + ) +} + +// serializeTxRecord serializes a TxRecord based on tlv format. +func serializeTxRecord(w io.Writer, tx *TxRecord) error { + // Create the tlv stream. + tlvStream, err := tx.toTlvStream() + if err != nil { + return err + } + + // Encode the tlv stream. + var buf bytes.Buffer + if err := tlvStream.Encode(&buf); err != nil { + return err + } + + // Write the tlv stream. + if _, err = w.Write(buf.Bytes()); err != nil { + return err + } + + return nil +} + +// deserializeTxRecord deserializes a TxRecord based on tlv format. +func deserializeTxRecord(r io.Reader) (*TxRecord, error) { + var tx TxRecord + + // Create the tlv stream. + tlvStream, err := tx.toTlvStream() + if err != nil { + return nil, err + } + + if err := tlvStream.Decode(r); err != nil { + return nil, err + } + + return &tx, nil +} + // SweeperStore stores published txes. type SweeperStore interface { // IsOurTx determines whether a tx is published by us, based on its // hash. IsOurTx(hash chainhash.Hash) (bool, error) - // NotifyPublishTx signals that we are about to publish a tx. - NotifyPublishTx(*wire.MsgTx) error + // StoreTx stores a tx hash we are about to publish. + StoreTx(*TxRecord) error // ListSweeps lists all the sweeps we have successfully published. ListSweeps() ([]chainhash.Hash, error) + + // GetTx queries the database to find the tx that matches the given + // txid. Returns ErrTxNotFound if it cannot be found. + GetTx(hash chainhash.Hash) (*TxRecord, error) + + // DeleteTx removes a tx specified by the hash from the store. + DeleteTx(hash chainhash.Hash) error } type sweeperStore struct { @@ -83,6 +174,8 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) ( // migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This // is not implemented as a database migration, to keep the downgrade path open. +// +// TODO(yy): delete this function once nursery is removed. func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket, chainHash *chainhash.Hash) error { @@ -138,7 +231,24 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket, log.Debugf("Inserting nursery tx %v in hash list "+ "(height=%v)", hash, byteOrder.Uint32(k)) - return txHashesBucket.Put(hash[:], []byte{}) + // Create the transaction record. Since this is an old record, + // we can assume it's already been published. Although it's + // possible to calculate the fees and fee rate used here, we + // skip it as it's unlikely we'd perform RBF on these old + // sweeping transactions. + tr := &TxRecord{ + Txid: hash, + Published: true, + } + + // Serialize tx record. + var b bytes.Buffer + err = serializeTxRecord(&b, tr) + if err != nil { + return err + } + + return txHashesBucket.Put(tr.Txid[:], b.Bytes()) }) if err != nil { return err @@ -147,18 +257,22 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket, return nil } -// NotifyPublishTx signals that we are about to publish a tx. -func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error { +// StoreTx stores that we are about to publish a tx. +func (s *sweeperStore) StoreTx(tr *TxRecord) error { return kvdb.Update(s.db, func(tx kvdb.RwTx) error { - txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey) if txHashesBucket == nil { return errNoTxHashesBucket } - hash := sweepTx.TxHash() + // Serialize tx record. + var b bytes.Buffer + err := serializeTxRecord(&b, tr) + if err != nil { + return err + } - return txHashesBucket.Put(hash[:], []byte{}) + return txHashesBucket.Put(tr.Txid[:], b.Bytes()) }, func() {}) } @@ -215,5 +329,66 @@ func (s *sweeperStore) ListSweeps() ([]chainhash.Hash, error) { return sweepTxns, nil } +// GetTx queries the database to find the tx that matches the given txid. +// Returns ErrTxNotFound if it cannot be found. +func (s *sweeperStore) GetTx(txid chainhash.Hash) (*TxRecord, error) { + // Create a record. + tr := &TxRecord{} + + var err error + err = kvdb.View(s.db, func(tx kvdb.RTx) error { + txHashesBucket := tx.ReadBucket(txHashesBucketKey) + if txHashesBucket == nil { + return errNoTxHashesBucket + } + + txBytes := txHashesBucket.Get(txid[:]) + if txBytes == nil { + return ErrTxNotFound + } + + // For old records, we'd get an empty byte slice here. We can + // assume it's already been published. Although it's possible + // to calculate the fees and fee rate used here, we skip it as + // it's unlikely we'd perform RBF on these old sweeping + // transactions. + // + // TODO(yy): remove this check once migration is added. + if len(txBytes) == 0 { + tr.Published = true + return nil + } + + tr, err = deserializeTxRecord(bytes.NewReader(txBytes)) + if err != nil { + return err + } + + return nil + }, func() { + tr = &TxRecord{} + }) + if err != nil { + return nil, err + } + + // Attach the txid to the record. + tr.Txid = txid + + return tr, nil +} + +// DeleteTx removes the given tx from db. +func (s *sweeperStore) DeleteTx(txid chainhash.Hash) error { + return kvdb.Update(s.db, func(tx kvdb.RwTx) error { + txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey) + if txHashesBucket == nil { + return errNoTxHashesBucket + } + + return txHashesBucket.Delete(txid[:]) + }, func() {}) +} + // Compile-time constraint to ensure sweeperStore implements SweeperStore. var _ SweeperStore = (*sweeperStore)(nil) diff --git a/sweep/store_mock.go b/sweep/store_mock.go index 53d9080d8b3..73b797963dc 100644 --- a/sweep/store_mock.go +++ b/sweep/store_mock.go @@ -2,45 +2,58 @@ package sweep import ( "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/mock" ) // MockSweeperStore is a mock implementation of sweeper store. This type is // exported, because it is currently used in nursery tests too. type MockSweeperStore struct { - ourTxes map[chainhash.Hash]struct{} + mock.Mock } // NewMockSweeperStore returns a new instance. func NewMockSweeperStore() *MockSweeperStore { - return &MockSweeperStore{ - ourTxes: make(map[chainhash.Hash]struct{}), - } + return &MockSweeperStore{} } -// IsOurTx determines whether a tx is published by us, based on its -// hash. +// IsOurTx determines whether a tx is published by us, based on its hash. func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) { - _, ok := s.ourTxes[hash] - return ok, nil -} + args := s.Called(hash) -// NotifyPublishTx signals that we are about to publish a tx. -func (s *MockSweeperStore) NotifyPublishTx(tx *wire.MsgTx) error { - txHash := tx.TxHash() - s.ourTxes[txHash] = struct{}{} + return args.Bool(0), args.Error(1) +} - return nil +// StoreTx stores a tx we are about to publish. +func (s *MockSweeperStore) StoreTx(tr *TxRecord) error { + args := s.Called(tr) + return args.Error(0) } // ListSweeps lists all the sweeps we have successfully published. func (s *MockSweeperStore) ListSweeps() ([]chainhash.Hash, error) { - var txns []chainhash.Hash - for tx := range s.ourTxes { - txns = append(txns, tx) + args := s.Called() + + return args.Get(0).([]chainhash.Hash), args.Error(1) +} + +// GetTx queries the database to find the tx that matches the given txid. +// Returns ErrTxNotFound if it cannot be found. +func (s *MockSweeperStore) GetTx(hash chainhash.Hash) (*TxRecord, error) { + args := s.Called(hash) + + tr := args.Get(0) + if tr != nil { + return args.Get(0).(*TxRecord), args.Error(1) } - return txns, nil + return nil, args.Error(1) +} + +// DeleteTx removes the given tx from db. +func (s *MockSweeperStore) DeleteTx(txid chainhash.Hash) error { + args := s.Called(txid) + + return args.Error(0) } // Compile-time constraint to ensure MockSweeperStore implements SweeperStore. diff --git a/sweep/store_test.go b/sweep/store_test.go index 60e66b4b042..ea65b017796 100644 --- a/sweep/store_test.go +++ b/sweep/store_test.go @@ -1,46 +1,26 @@ package sweep import ( + "bytes" "testing" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" "github.com/stretchr/testify/require" ) // TestStore asserts that the store persists the presented data to disk and is // able to retrieve it again. func TestStore(t *testing.T) { - t.Run("bolt", func(t *testing.T) { - - // Create new store. - cdb, err := channeldb.MakeTestDB(t) - if err != nil { - t.Fatalf("unable to open channel db: %v", err) - } - - testStore(t, func() (SweeperStore, error) { - var chain chainhash.Hash - return NewSweeperStore(cdb, &chain) - }) - }) - t.Run("mock", func(t *testing.T) { - store := NewMockSweeperStore() - - testStore(t, func() (SweeperStore, error) { - // Return same store, because the mock has no real - // persistence. - return store, nil - }) - }) -} + // Create new store. + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) -func testStore(t *testing.T, createStore func() (SweeperStore, error)) { - store, err := createStore() - if err != nil { - t.Fatal(err) - } + var chain chainhash.Hash + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) // Notify publication of tx1 tx1 := wire.MsgTx{} @@ -50,11 +30,13 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { }, }) - err = store.NotifyPublishTx(&tx1) - if err != nil { - t.Fatal(err) + tr1 := &TxRecord{ + Txid: tx1.TxHash(), } + err = store.StoreTx(tr1) + require.NoError(t, err) + // Notify publication of tx2 tx2 := wire.MsgTx{} tx2.AddTxIn(&wire.TxIn{ @@ -63,43 +45,31 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { }, }) - err = store.NotifyPublishTx(&tx2) - if err != nil { - t.Fatal(err) + tr2 := &TxRecord{ + Txid: tx2.TxHash(), } + err = store.StoreTx(tr2) + require.NoError(t, err) + // Recreate the sweeper store - store, err = createStore() - if err != nil { - t.Fatal(err) - } + store, err = NewSweeperStore(cdb, &chain) + require.NoError(t, err) // Assert that both txes are recognized as our own. ours, err := store.IsOurTx(tx1.TxHash()) - if err != nil { - t.Fatal(err) - } - if !ours { - t.Fatal("expected tx to be ours") - } + require.NoError(t, err) + require.True(t, ours, "expected tx to be ours") ours, err = store.IsOurTx(tx2.TxHash()) - if err != nil { - t.Fatal(err) - } - if !ours { - t.Fatal("expected tx to be ours") - } + require.NoError(t, err) + require.True(t, ours, "expected tx to be ours") // An different hash should be reported as not being ours. var unknownHash chainhash.Hash ours, err = store.IsOurTx(unknownHash) - if err != nil { - t.Fatal(err) - } - if ours { - t.Fatal("expected tx to be not ours") - } + require.NoError(t, err) + require.False(t, ours, "expected tx to not be ours") txns, err := store.ListSweeps() require.NoError(t, err, "unexpected error") @@ -110,16 +80,143 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) { tx1.TxHash(): true, tx2.TxHash(): true, } - - if len(txns) != len(expected) { - t.Fatalf("expected: %v sweeps, got: %v", len(expected), - len(txns)) - } + require.Len(t, txns, len(expected)) for _, tx := range txns { _, ok := expected[tx] - if !ok { - t.Fatalf("unexpected tx: %v", tx) - } + require.Truef(t, ok, "unexpected txid returned: %v", tx) } } + +// TestTxRecord asserts that the serializeTxRecord and deserializeTxRecord +// behave as expected. +func TestTxRecord(t *testing.T) { + t.Parallel() + + // Create a testing record. + // + // NOTE: Txid is omitted because it is not serialized. + tr := &TxRecord{ + FeeRate: 1000, + Fee: 10000, + Published: true, + } + + var b bytes.Buffer + + // Assert we can serialize the record. + err := serializeTxRecord(&b, tr) + require.NoError(t, err) + + // Assert we can deserialize the record. + result, err := deserializeTxRecord(&b) + require.NoError(t, err) + + // Assert the deserialized record is equal to the original. + require.Equal(t, tr, result) +} + +// TestGetTx asserts that the GetTx method behaves as expected. +func TestGetTx(t *testing.T) { + t.Parallel() + + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) + + // Create a testing store. + chain := chainhash.Hash{} + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) + + // Create a testing record. + txid := chainhash.Hash{1, 2, 3} + tr := &TxRecord{ + Txid: txid, + FeeRate: 1000, + Fee: 10000, + Published: true, + } + + // Assert we can store this tx record. + err = store.StoreTx(tr) + require.NoError(t, err) + + // Assert we can query the tx record. + result, err := store.GetTx(txid) + require.NoError(t, err) + require.Equal(t, tr, result) + + // Assert we get an error when querying a non-existing tx. + _, err = store.GetTx(chainhash.Hash{4, 5, 6}) + require.ErrorIs(t, ErrTxNotFound, err) +} + +// TestGetTxCompatible asserts that when there's old tx record data in the +// database it can be successfully queried. +func TestGetTxCompatible(t *testing.T) { + t.Parallel() + + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) + + // Create a testing store. + chain := chainhash.Hash{} + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) + + // Create a testing txid. + txid := chainhash.Hash{0, 1, 2, 3} + + // Create a record using the old format "hash -> empty byte slice". + err = kvdb.Update(cdb, func(tx kvdb.RwTx) error { + txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey) + return txHashesBucket.Put(txid[:], []byte{}) + }, func() {}) + require.NoError(t, err) + + // Assert we can query the tx record. + result, err := store.GetTx(txid) + require.NoError(t, err) + require.Equal(t, txid, result.Txid) + + // Assert the Published field is true. + require.True(t, result.Published) +} + +// TestDeleteTx asserts that the DeleteTx method behaves as expected. +func TestDeleteTx(t *testing.T) { + t.Parallel() + + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) + + // Create a testing store. + chain := chainhash.Hash{} + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) + + // Create a testing record. + txid := chainhash.Hash{1, 2, 3} + tr := &TxRecord{ + Txid: txid, + FeeRate: 1000, + Fee: 10000, + Published: true, + } + + // Assert we can store this tx record. + err = store.StoreTx(tr) + require.NoError(t, err) + + // Assert we can delete the tx record. + err = store.DeleteTx(txid) + require.NoError(t, err) + + // Query it again should give us an error. + _, err = store.GetTx(txid) + require.ErrorIs(t, ErrTxNotFound, err) + + // Assert deleting a non-existing tx doesn't return an error. + err = store.DeleteTx(chainhash.Hash{4, 5, 6}) + require.NoError(t, err) +} diff --git a/sweep/sweeper.go b/sweep/sweeper.go index ebdce7d1bc4..f3920a41483 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1163,7 +1163,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, } // Create sweep tx. - tx, err := createSweepTx( + tx, fee, err := createSweepTx( inputs, nil, s.currentOutputScript, uint32(currentHeight), feeRate, s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer, ) @@ -1171,14 +1171,20 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, return fmt.Errorf("create sweep tx: %w", err) } + tr := &TxRecord{ + Txid: tx.TxHash(), + FeeRate: uint64(feeRate), + Fee: uint64(fee), + } + // Add tx before publication, so that we will always know that a spend // by this tx is ours. Otherwise if the publish doesn't return, but did // publish, we loose track of this tx. Even republication on startup // doesn't prevent this, because that call returns a double spend error // then and would also not add the hash to the store. - err = s.cfg.Store.NotifyPublishTx(tx) + err = s.cfg.Store.StoreTx(tr) if err != nil { - return fmt.Errorf("notify publish tx: %w", err) + return fmt.Errorf("store tx: %w", err) } // Reschedule the inputs that we just tried to sweep. This is done in @@ -1197,6 +1203,16 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, return err } + // Mark this tx in db once successfully published. + // + // NOTE: this will behave as an overwrite, which is fine as the record + // is small. + tr.Published = true + err = s.cfg.Store.StoreTx(tr) + if err != nil { + return fmt.Errorf("store tx: %w", err) + } + // If there's no error, remove the output script. Otherwise keep it so // that it can be reused for the next transaction and causes no address // inflation. @@ -1467,10 +1483,12 @@ func (s *UtxoSweeper) CreateSweepTx(inputs []input.Input, feePref FeePreference, return nil, err } - return createSweepTx( + tx, _, err := createSweepTx( inputs, nil, pkScript, currentBlockHeight, feePerKw, s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer, ) + + return tx, err } // DefaultNextAttemptDeltaFunc is the default calculation for next sweep attempt diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 3054c9f0c38..2003254cd60 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lntest/mock" @@ -41,7 +42,7 @@ type sweeperTestContext struct { notifier *MockNotifier estimator *mockFeeEstimator backend *mockBackend - store *MockSweeperStore + store SweeperStore publishChan chan wire.MsgTx } @@ -102,7 +103,13 @@ func init() { func createSweeperTestContext(t *testing.T) *sweeperTestContext { notifier := NewMockNotifier(t) - store := NewMockSweeperStore() + // Create new store. + cdb, err := channeldb.MakeTestDB(t) + require.NoError(t, err) + + var chain chainhash.Hash + store, err := NewSweeperStore(cdb, &chain) + require.NoError(t, err) backend := newMockBackend(t, notifier) backend.walletUtxos = []*lnwallet.Utxo{ @@ -682,7 +689,6 @@ func TestIdempotency(t *testing.T) { // Timer is still running, but spend notification was delivered before // it expired. - ctx.finish(1) } @@ -701,9 +707,8 @@ func TestRestart(t *testing.T) { // Sweep input and expect sweep tx. input1 := spendableInputs[0] - if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil { - t.Fatal(err) - } + _, err := ctx.sweeper.SweepInput(input1, defaultFeePref) + require.NoError(t, err) ctx.receiveTx() @@ -758,23 +763,20 @@ func TestRestart(t *testing.T) { ctx.finish(1) } -// TestRestartRemoteSpend asserts that the sweeper picks up sweeping properly after -// a restart with remote spend. +// TestRestartRemoteSpend asserts that the sweeper picks up sweeping properly +// after a restart with remote spend. func TestRestartRemoteSpend(t *testing.T) { - ctx := createSweeperTestContext(t) // Sweep input. input1 := spendableInputs[0] - if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil { - t.Fatal(err) - } + _, err := ctx.sweeper.SweepInput(input1, defaultFeePref) + require.NoError(t, err) // Sweep another input. input2 := spendableInputs[1] - if _, err := ctx.sweeper.SweepInput(input2, defaultFeePref); err != nil { - t.Fatal(err) - } + _, err = ctx.sweeper.SweepInput(input2, defaultFeePref) + require.NoError(t, err) sweepTx := ctx.receiveTx() @@ -798,7 +800,8 @@ func TestRestartRemoteSpend(t *testing.T) { // Mine remote spending tx. ctx.backend.mine() - // Simulate other subsystem (e.g. contract resolver) re-offering input 0. + // Simulate other subsystem (e.g. contract resolver) re-offering input + // 0. spendChan, err := ctx.sweeper.SweepInput(input1, defaultFeePref) if err != nil { t.Fatal(err) @@ -815,8 +818,8 @@ func TestRestartRemoteSpend(t *testing.T) { ctx.finish(1) } -// TestRestartConfirmed asserts that the sweeper picks up sweeping properly after -// a restart with a confirm of our own sweep tx. +// TestRestartConfirmed asserts that the sweeper picks up sweeping properly +// after a restart with a confirm of our own sweep tx. func TestRestartConfirmed(t *testing.T) { ctx := createSweeperTestContext(t) @@ -834,7 +837,8 @@ func TestRestartConfirmed(t *testing.T) { // Mine the sweep tx. ctx.backend.mine() - // Simulate other subsystem (e.g. contract resolver) re-offering input 0. + // Simulate other subsystem (e.g. contract resolver) re-offering input + // 0. spendChan, err := ctx.sweeper.SweepInput(input, defaultFeePref) if err != nil { t.Fatal(err) diff --git a/sweep/txgenerator.go b/sweep/txgenerator.go index 45341cc8c5d..2fae5b88674 100644 --- a/sweep/txgenerator.go +++ b/sweep/txgenerator.go @@ -20,6 +20,14 @@ var ( // allowed in a single sweep tx. If more need to be swept, multiple txes // are created and published. DefaultMaxInputsPerTx = 100 + + // ErrLocktimeConflict is returned when inputs with different + // transaction nLockTime values are included in the same transaction. + // + // NOTE: due the SINGLE|ANYONECANPAY sighash flag, which is used in the + // second level success/timeout txns, only the txns sharing the same + // nLockTime can exist in the same tx. + ErrLocktimeConflict = errors.New("incompatible locktime") ) // txInput is an interface that provides the input data required for tx @@ -140,13 +148,13 @@ func generateInputPartitionings(sweepableInputs []txInput, func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, changePkScript []byte, currentBlockHeight uint32, feePerKw, maxFeeRate chainfee.SatPerKWeight, - signer input.Signer) (*wire.MsgTx, error) { + signer input.Signer) (*wire.MsgTx, btcutil.Amount, error) { inputs, estimator, err := getWeightEstimate( inputs, outputs, feePerKw, maxFeeRate, changePkScript, ) if err != nil { - return nil, err + return nil, 0, err } txFee := estimator.fee() @@ -188,7 +196,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, // If another input commits to a different locktime, // they cannot be combined in the same transaction. if locktime != -1 && locktime != int32(lt) { - return nil, fmt.Errorf("incompatible locktime") + return nil, 0, ErrLocktimeConflict } locktime = int32(lt) @@ -213,7 +221,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, if lt, ok := o.RequiredLockTime(); ok { if locktime != -1 && locktime != int32(lt) { - return nil, fmt.Errorf("incompatible locktime") + return nil, 0, ErrLocktimeConflict } locktime = int32(lt) @@ -229,7 +237,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, } if requiredOutput+txFee > totalInput { - return nil, fmt.Errorf("insufficient input to create sweep "+ + return nil, 0, fmt.Errorf("insufficient input to create sweep "+ "tx: input_sum=%v, output_sum=%v", totalInput, requiredOutput+txFee) } @@ -253,6 +261,10 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, } else { log.Infof("Change amt %v below dustlimit %v, not adding "+ "change output", changeAmt, changeLimit) + + // The dust amount is added to the fee as the miner will + // collect it. + txFee += changeAmt } // We'll default to using the current block height as locktime, if none @@ -270,12 +282,12 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, // classes if fees are too low. btx := btcutil.NewTx(sweepTx) if err := blockchain.CheckTransactionSanity(btx); err != nil { - return nil, err + return nil, 0, err } prevInputFetcher, err := input.MultiPrevOutFetcher(inputs) if err != nil { - return nil, fmt.Errorf("error creating prev input fetcher "+ + return nil, 0, fmt.Errorf("error creating prev input fetcher "+ "for hash cache: %v", err) } hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher) @@ -293,7 +305,8 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, sweepTx.TxIn[idx].Witness = inputScript.Witness if len(inputScript.SigScript) != 0 { - sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript + sweepTx.TxIn[idx].SignatureScript = + inputScript.SigScript } return nil @@ -301,7 +314,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, for idx, inp := range idxs { if err := addInputScript(idx, inp); err != nil { - return nil, err + return nil, 0, err } } @@ -315,7 +328,7 @@ func createSweepTx(inputs []input.Input, outputs []*wire.TxOut, estimator.parentsWeight, ) - return sweepTx, nil + return sweepTx, txFee, nil } // getWeightEstimate returns a weight estimate for the given inputs. diff --git a/sweep/walletsweep.go b/sweep/walletsweep.go index 831b0151e48..d3c5365c0a8 100644 --- a/sweep/walletsweep.go +++ b/sweep/walletsweep.go @@ -319,7 +319,7 @@ func CraftSweepAllTx(feeRate, maxFeeRate chainfee.SatPerKWeight, // Finally, we'll ask the sweeper to craft a sweep transaction which // respects our fee preference and targets all the UTXOs of the wallet. - sweepTx, err := createSweepTx( + sweepTx, _, err := createSweepTx( inputsToSweep, txOuts, changePkScript, blockHeight, feeRate, maxFeeRate, signer, )