From e8310518fce11d426a2480802fb566539ff8d887 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 30 Apr 2024 17:13:57 +0800 Subject: [PATCH 01/12] sweep: add new state `TxFatal` for erroneous sweepings Also updated the loggings. This new state will be used in the following commit. --- sweep/fee_bumper.go | 59 ++++++++++++++++++++++++++++++---------- sweep/fee_bumper_test.go | 20 ++++++++++++++ sweep/sweeper.go | 5 +++- 3 files changed, 68 insertions(+), 16 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index fd3dfba9e19..4d42cdb836b 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -84,6 +84,11 @@ const ( // TxConfirmed is sent when the tx is confirmed. TxConfirmed + // TxFatal is sent when the inputs in this tx cannot be retried. Txns + // will end up in this state if they have encountered a non-fee related + // error, which means they cannot be retried with increased budget. + TxFatal + // sentinalEvent is used to check if an event is unknown. sentinalEvent ) @@ -99,6 +104,8 @@ func (e BumpEvent) String() string { return "Replaced" case TxConfirmed: return "Confirmed" + case TxFatal: + return "Fatal" default: return "Unknown" } @@ -246,10 +253,20 @@ type BumpResult struct { requestID uint64 } +// String returns a human-readable string for the result. +func (b *BumpResult) String() string { + desc := fmt.Sprintf("Event=%v", b.Event) + if b.Tx != nil { + desc += fmt.Sprintf(", Tx=%v", b.Tx.TxHash()) + } + + return fmt.Sprintf("[%s]", desc) +} + // Validate validates the BumpResult so it's safe to use. func (b *BumpResult) Validate() error { - // Every result must have a tx. - if b.Tx == nil { + // Every result must have a tx except the fatal or failed case. + if b.Tx == nil && b.Event != TxFatal { return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult) } @@ -263,9 +280,11 @@ func (b *BumpResult) Validate() error { return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult) } - // If it's a failed event, it must have an error. - if b.Event == TxFailed && b.Err == nil { - return fmt.Errorf("%w: nil error", ErrInvalidBumpResult) + // If it's a failed or fatal event, it must have an error. + if b.Event == TxFatal || b.Event == TxFailed { + if b.Err == nil { + return fmt.Errorf("%w: nil error", ErrInvalidBumpResult) + } } // If it's a confirmed event, it must have a fee rate and fee. @@ -654,8 +673,7 @@ func (t *TxPublisher) notifyResult(result *BumpResult) { return } - log.Debugf("Sending result for requestID=%v, tx=%v", id, - result.Tx.TxHash()) + log.Debugf("Sending result %v for requestID=%v", result, id) select { // Send the result to the subscriber. @@ -673,20 +691,31 @@ func (t *TxPublisher) notifyResult(result *BumpResult) { func (t *TxPublisher) removeResult(result *BumpResult) { id := result.requestID - // Remove the record from the maps if there's an error. This means this - // tx has failed its broadcast and cannot be retried. There are two - // cases, - // - when the budget cannot cover the fee. - // - when a non-RBF related error occurs. + var txid chainhash.Hash + if result.Tx != nil { + txid = result.Tx.TxHash() + } + + // Remove the record from the maps if there's an error or the tx is + // confirmed. When there's an error, it means this tx has failed its + // broadcast and cannot be retried. There are two cases it may fail, + // - when the budget cannot cover the increased fee calculated by the + // fee function, hence the budget is used up. + // - when a non-fee related error returned from PublishTransaction. switch result.Event { case TxFailed: log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v", - id, result.Tx.TxHash(), result.Err) + id, txid, result.Err) case TxConfirmed: - // Remove the record is the tx is confirmed. + // Remove the record if the tx is confirmed. log.Debugf("Removing confirmed monitor record=%v, tx=%v", id, - result.Tx.TxHash()) + txid) + + case TxFatal: + // Remove the record if there's an error. + log.Debugf("Removing monitor record=%v due to fatal err: %v", + id, result.Err) // Do nothing if it's neither failed or confirmed. default: diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 53b38607f7d..807d1df595d 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -91,6 +91,19 @@ func TestBumpResultValidate(t *testing.T) { } require.ErrorIs(t, b.Validate(), ErrInvalidBumpResult) + // A failed event without a tx will give an error. + b = BumpResult{ + Event: TxFailed, + Err: errDummy, + } + require.ErrorIs(t, b.Validate(), ErrInvalidBumpResult) + + // A fatal event without a failure reason will give an error. + b = BumpResult{ + Event: TxFailed, + } + require.ErrorIs(t, b.Validate(), ErrInvalidBumpResult) + // A confirmed event without fee info will give an error. b = BumpResult{ Tx: &wire.MsgTx{}, @@ -104,6 +117,13 @@ func TestBumpResultValidate(t *testing.T) { Event: TxPublished, } require.NoError(t, b.Validate()) + + // Tx is allowed to be nil in a TxFatal event. + b = BumpResult{ + Event: TxFatal, + Err: errDummy, + } + require.NoError(t, b.Validate()) } // TestCalcSweepTxWeight checks that the weight of the sweep tx is calculated diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 6257faac1f5..fa5c921dfb4 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1729,7 +1729,7 @@ func (s *UtxoSweeper) handleBumpEventTxPublished(r *BumpResult) error { // NOTE: TxConfirmed event is not handled, since we already subscribe to the // input's spending event, we don't need to do anything here. func (s *UtxoSweeper) handleBumpEvent(r *BumpResult) error { - log.Debugf("Received bump event [%v] for tx %v", r.Event, r.Tx.TxHash()) + log.Debugf("Received bump result %v", r) switch r.Event { // The tx has been published, we update the inputs' state and create a @@ -1745,6 +1745,9 @@ func (s *UtxoSweeper) handleBumpEvent(r *BumpResult) error { // with the new one. case TxReplaced: return s.handleBumpEventTxReplaced(r) + + case TxFatal: + // TODO(yy): create a method to remove this input. } return nil From 9a0d1ce5582da4d0b840284ec09719a27c2fb784 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 25 Oct 2024 15:27:26 +0800 Subject: [PATCH 02/12] sweep: add new error `ErrZeroFeeRateDelta` --- sweep/fee_function.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sweep/fee_function.go b/sweep/fee_function.go index cbf283e37d7..15d44ed6160 100644 --- a/sweep/fee_function.go +++ b/sweep/fee_function.go @@ -14,6 +14,9 @@ var ( // ErrMaxPosition is returned when trying to increase the position of // the fee function while it's already at its max. ErrMaxPosition = errors.New("position already at max") + + // ErrZeroFeeRateDelta is returned when the fee rate delta is zero. + ErrZeroFeeRateDelta = errors.New("fee rate delta is zero") ) // mSatPerKWeight represents a fee rate in msat/kw. @@ -169,7 +172,7 @@ func NewLinearFeeFunction(maxFeeRate chainfee.SatPerKWeight, "endingFeeRate=%v, width=%v, delta=%v", start, end, l.width, l.deltaFeeRate) - return nil, fmt.Errorf("fee rate delta is zero") + return nil, ErrZeroFeeRateDelta } // Attach the calculated values to the fee function. From 9b69e4e384c83f80d59fd0e1ee6ce4cbb8fafda0 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 25 Oct 2024 15:32:30 +0800 Subject: [PATCH 03/12] sweep: add new interface method `Immediate` This prepares the following commit where we now let the fee bumpr decides whether to broadcast immediately or not. --- sweep/fee_bumper.go | 4 ++++ sweep/mock_test.go | 7 +++++++ sweep/sweeper.go | 1 + sweep/sweeper_test.go | 2 ++ sweep/tx_input_set.go | 22 ++++++++++++++++++++++ 5 files changed, 36 insertions(+) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 4d42cdb836b..39b6fba2d29 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -143,6 +143,10 @@ type BumpRequest struct { // ExtraTxOut tracks if this bump request has an optional set of extra // outputs to add to the transaction. ExtraTxOut fn.Option[SweepOutput] + + // Immediate is used to specify that the tx should be broadcast + // immediately. + Immediate bool } // MaxFeeRateAllowed returns the maximum fee rate allowed for the given diff --git a/sweep/mock_test.go b/sweep/mock_test.go index c623ca3c0b4..3179ed8837a 100644 --- a/sweep/mock_test.go +++ b/sweep/mock_test.go @@ -268,6 +268,13 @@ func (m *MockInputSet) StartingFeeRate() fn.Option[chainfee.SatPerKWeight] { return args.Get(0).(fn.Option[chainfee.SatPerKWeight]) } +// Immediate returns whether the inputs should be swept immediately. +func (m *MockInputSet) Immediate() bool { + args := m.Called() + + return args.Bool(0) +} + // MockBumper is a mock implementation of the interface Bumper. type MockBumper struct { mock.Mock diff --git a/sweep/sweeper.go b/sweep/sweeper.go index fa5c921dfb4..e845a7f1219 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -827,6 +827,7 @@ func (s *UtxoSweeper) sweep(set InputSet) error { DeliveryAddress: sweepAddr, MaxFeeRate: s.cfg.MaxFeeRate.FeePerKWeight(), StartingFeeRate: set.StartingFeeRate(), + Immediate: set.Immediate(), // TODO(yy): pass the strategy here. } diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 6d9c6c3d2e5..58e8caeff03 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -704,11 +704,13 @@ func TestSweepPendingInputs(t *testing.T) { setNeedWallet.On("Budget").Return(btcutil.Amount(1)).Once() setNeedWallet.On("StartingFeeRate").Return( fn.None[chainfee.SatPerKWeight]()).Once() + setNeedWallet.On("Immediate").Return(false).Once() normalSet.On("Inputs").Return(nil).Maybe() normalSet.On("DeadlineHeight").Return(testHeight).Once() normalSet.On("Budget").Return(btcutil.Amount(1)).Once() normalSet.On("StartingFeeRate").Return( fn.None[chainfee.SatPerKWeight]()).Once() + normalSet.On("Immediate").Return(false).Once() // Make pending inputs for testing. We don't need real values here as // the returned clusters are mocked. diff --git a/sweep/tx_input_set.go b/sweep/tx_input_set.go index 3a95fff2fb8..651691276e7 100644 --- a/sweep/tx_input_set.go +++ b/sweep/tx_input_set.go @@ -64,6 +64,13 @@ type InputSet interface { // StartingFeeRate returns the max starting fee rate found in the // inputs. StartingFeeRate() fn.Option[chainfee.SatPerKWeight] + + // Immediate returns a boolean to indicate whether the tx made from + // this input set should be published immediately. + // + // TODO(yy): create a new method `Params` to combine the informational + // methods DeadlineHeight, Budget, StartingFeeRate and Immediate. + Immediate() bool } // createWalletTxInput converts a wallet utxo into an object that can be added @@ -412,3 +419,18 @@ func (b *BudgetInputSet) StartingFeeRate() fn.Option[chainfee.SatPerKWeight] { return startingFeeRate } + +// Immediate returns whether the inputs should be swept immediately. +// +// NOTE: part of the InputSet interface. +func (b *BudgetInputSet) Immediate() bool { + for _, inp := range b.inputs { + // As long as one of the inputs is immediate, the whole set is + // immediate. + if inp.params.Immediate { + return true + } + } + + return false +} From bf3b58f13f702bfb4740eceaf6e1ebfaa6ce0723 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 25 Oct 2024 17:31:54 +0800 Subject: [PATCH 04/12] sweep: handle inputs locally instead of relying on the tx This commit changes how inputs are handled upon receiving a bump result. Previously the inputs are taken from the `BumpResult.Tx`, which is now instead being handled locally as we will remember the input set when sending the bump request, and handle this input set when a result is received. --- sweep/sweeper.go | 108 ++++++------ sweep/sweeper_test.go | 375 +++++++++++++++++++----------------------- 2 files changed, 232 insertions(+), 251 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index e845a7f1219..586b76cf5b4 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -309,9 +309,9 @@ type UtxoSweeper struct { // updated whenever a new block epoch is received. currentHeight int32 - // bumpResultChan is a channel that receives broadcast results from the + // bumpRespChan is a channel that receives broadcast results from the // TxPublisher. - bumpResultChan chan *BumpResult + bumpRespChan chan *bumpResp } // UtxoSweeperConfig contains dependencies of UtxoSweeper. @@ -395,7 +395,7 @@ func New(cfg *UtxoSweeperConfig) *UtxoSweeper { pendingSweepsReqs: make(chan *pendingSweepsReq), quit: make(chan struct{}), inputs: make(InputsMap), - bumpResultChan: make(chan *BumpResult, 100), + bumpRespChan: make(chan *bumpResp, 100), } } @@ -681,9 +681,9 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { s.sweepPendingInputs(inputs) } - case result := <-s.bumpResultChan: + case resp := <-s.bumpRespChan: // Handle the bump event. - err := s.handleBumpEvent(result) + err := s.handleBumpEvent(resp) if err != nil { log.Errorf("Failed to handle bump event: %v", err) @@ -840,16 +840,11 @@ func (s *UtxoSweeper) sweep(set InputSet) error { // this publish result and future RBF attempt. resp, err := s.cfg.Publisher.Broadcast(req) if err != nil { - outpoints := make([]wire.OutPoint, len(set.Inputs())) - for i, inp := range set.Inputs() { - outpoints[i] = inp.OutPoint() - } - log.Errorf("Initial broadcast failed: %v, inputs=\n%v", err, inputTypeSummary(set.Inputs())) // TODO(yy): find out which input is causing the failure. - s.markInputsPublishFailed(outpoints) + s.markInputsPublishFailed(set) return err } @@ -858,7 +853,7 @@ func (s *UtxoSweeper) sweep(set InputSet) error { // subscribing to the result chan and listen for future updates about // this tx. s.wg.Add(1) - go s.monitorFeeBumpResult(resp) + go s.monitorFeeBumpResult(set, resp) return nil } @@ -868,14 +863,14 @@ func (s *UtxoSweeper) sweep(set InputSet) error { func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) { // Reschedule sweep. for _, input := range set.Inputs() { - pi, ok := s.inputs[input.OutPoint()] + op := input.OutPoint() + pi, ok := s.inputs[op] if !ok { // It could be that this input is an additional wallet // input that was attached. In that case there also // isn't a pending input to update. log.Tracef("Skipped marking input as pending "+ - "published: %v not found in pending inputs", - input.OutPoint()) + "published: %v not found in pending inputs", op) continue } @@ -886,8 +881,7 @@ func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) { // publish. if pi.terminated() { log.Errorf("Expect input %v to not have terminated "+ - "state, instead it has %v", - input.OutPoint, pi.state) + "state, instead it has %v", op, pi.state) continue } @@ -902,9 +896,7 @@ func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) { // markInputsPublished updates the sweeping tx in db and marks the list of // inputs as published. -func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, - inputs []*wire.TxIn) error { - +func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error { // Mark this tx in db once successfully published. // // NOTE: this will behave as an overwrite, which is fine as the record @@ -916,15 +908,15 @@ func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, } // Reschedule sweep. - for _, input := range inputs { - pi, ok := s.inputs[input.PreviousOutPoint] + for _, input := range set.Inputs() { + op := input.OutPoint() + pi, ok := s.inputs[op] if !ok { // It could be that this input is an additional wallet // input that was attached. In that case there also // isn't a pending input to update. log.Tracef("Skipped marking input as published: %v "+ - "not found in pending inputs", - input.PreviousOutPoint) + "not found in pending inputs", op) continue } @@ -933,8 +925,7 @@ func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, if pi.state != PendingPublish { // We may get a Published if this is a replacement tx. log.Debugf("Expect input %v to have %v, instead it "+ - "has %v", input.PreviousOutPoint, - PendingPublish, pi.state) + "has %v", op, PendingPublish, pi.state) continue } @@ -950,9 +941,10 @@ func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, } // markInputsPublishFailed marks the list of inputs as failed to be published. -func (s *UtxoSweeper) markInputsPublishFailed(outpoints []wire.OutPoint) { +func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) { // Reschedule sweep. - for _, op := range outpoints { + for _, inp := range set.Inputs() { + op := inp.OutPoint() pi, ok := s.inputs[op] if !ok { // It could be that this input is an additional wallet @@ -1540,6 +1532,8 @@ func (s *UtxoSweeper) updateSweeperInputs() InputsMap { // sweepPendingInputs is called when the ticker fires. It will create clusters // and attempt to create and publish the sweeping transactions. func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) { + log.Debugf("Sweeping %v inputs", len(inputs)) + // Cluster all of our inputs based on the specific Aggregator. sets := s.cfg.Aggregator.ClusterInputs(inputs) @@ -1581,11 +1575,24 @@ func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) { } } +// bumpResp wraps the result of a bump attempt returned from the fee bumper and +// the inputs being used. +type bumpResp struct { + // result is the result of the bump attempt returned from the fee + // bumper. + result *BumpResult + + // set is the input set that was used in the bump attempt. + set InputSet +} + // monitorFeeBumpResult subscribes to the passed result chan to listen for // future updates about the sweeping tx. // // NOTE: must run as a goroutine. -func (s *UtxoSweeper) monitorFeeBumpResult(resultChan <-chan *BumpResult) { +func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet, + resultChan <-chan *BumpResult) { + defer s.wg.Done() for { @@ -1597,9 +1604,14 @@ func (s *UtxoSweeper) monitorFeeBumpResult(resultChan <-chan *BumpResult) { continue } + resp := &bumpResp{ + result: r, + set: set, + } + // Send the result back to the main event loop. select { - case s.bumpResultChan <- r: + case s.bumpRespChan <- resp: case <-s.quit: log.Debug("Sweeper shutting down, skip " + "sending bump result") @@ -1635,25 +1647,25 @@ func (s *UtxoSweeper) monitorFeeBumpResult(resultChan <-chan *BumpResult) { // handleBumpEventTxFailed handles the case where the tx has been failed to // publish. -func (s *UtxoSweeper) handleBumpEventTxFailed(r *BumpResult) error { +func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) { + r := resp.result tx, err := r.Tx, r.Err log.Errorf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(), err) - outpoints := make([]wire.OutPoint, 0, len(tx.TxIn)) - for _, inp := range tx.TxIn { - outpoints = append(outpoints, inp.PreviousOutPoint) - } - + // NOTE: When marking the inputs as failed, we are using the input set + // instead of the inputs found in the tx. This is fine for current + // version of the sweeper because we always create a tx using ALL of + // the inputs specified by the set. + // // TODO(yy): should we also remove the failed tx from db? - s.markInputsPublishFailed(outpoints) - - return err + s.markInputsPublishFailed(resp.set) } // handleBumpEventTxReplaced handles the case where the sweeping tx has been // replaced by a new one. -func (s *UtxoSweeper) handleBumpEventTxReplaced(r *BumpResult) error { +func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error { + r := resp.result oldTx := r.ReplacedTx newTx := r.Tx @@ -1693,12 +1705,13 @@ func (s *UtxoSweeper) handleBumpEventTxReplaced(r *BumpResult) error { } // Mark the inputs as published using the replacing tx. - return s.markInputsPublished(tr, r.Tx.TxIn) + return s.markInputsPublished(tr, resp.set) } // handleBumpEventTxPublished handles the case where the sweeping tx has been // successfully published. -func (s *UtxoSweeper) handleBumpEventTxPublished(r *BumpResult) error { +func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error { + r := resp.result tx := r.Tx tr := &TxRecord{ Txid: tx.TxHash(), @@ -1708,7 +1721,7 @@ func (s *UtxoSweeper) handleBumpEventTxPublished(r *BumpResult) error { // Inputs have been successfully published so we update their // states. - err := s.markInputsPublished(tr, tx.TxIn) + err := s.markInputsPublished(tr, resp.set) if err != nil { return err } @@ -1729,10 +1742,10 @@ func (s *UtxoSweeper) handleBumpEventTxPublished(r *BumpResult) error { // // NOTE: TxConfirmed event is not handled, since we already subscribe to the // input's spending event, we don't need to do anything here. -func (s *UtxoSweeper) handleBumpEvent(r *BumpResult) error { - log.Debugf("Received bump result %v", r) +func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error { + log.Debugf("Received bump result %v", r.result) - switch r.Event { + switch r.result.Event { // The tx has been published, we update the inputs' state and create a // record to be stored in the sweeper db. case TxPublished: @@ -1740,7 +1753,8 @@ func (s *UtxoSweeper) handleBumpEvent(r *BumpResult) error { // The tx has failed, we update the inputs' state. case TxFailed: - return s.handleBumpEventTxFailed(r) + s.handleBumpEventTxFailed(r) + return nil // The tx has been replaced, we will remove the old tx and replace it // with the new one. diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 58e8caeff03..54bc0c21762 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1,6 +1,7 @@ package sweep import ( + "crypto/rand" "errors" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/stretchr/testify/mock" @@ -33,6 +35,41 @@ var ( }) ) +// createMockInput creates a mock input and saves it to the sweeper's inputs +// map. The created input has the specified state and a random outpoint. It +// will assert the method `OutPoint` is called at least once. +func createMockInput(t *testing.T, s *UtxoSweeper, + state SweepState) *input.MockInput { + + inp := &input.MockInput{} + t.Cleanup(func() { + inp.AssertExpectations(t) + }) + + randBuf := make([]byte, lntypes.HashSize) + _, err := rand.Read(randBuf) + require.NoError(t, err, "internal error, cannot generate random bytes") + + randHash, err := chainhash.NewHash(randBuf) + require.NoError(t, err) + + inp.On("OutPoint").Return(wire.OutPoint{ + Hash: *randHash, + Index: 0, + }) + + // We don't do branch switches based on the witness type here so we + // just mock it. + inp.On("WitnessType").Return(input.CommitmentTimeLock).Maybe() + + s.inputs[inp.OutPoint()] = &SweeperInput{ + Input: inp, + state: state, + } + + return inp +} + // TestMarkInputsPendingPublish checks that given a list of inputs with // different states, only the non-terminal state will be marked as `Published`. func TestMarkInputsPendingPublish(t *testing.T) { @@ -47,50 +84,21 @@ func TestMarkInputsPendingPublish(t *testing.T) { set := &MockInputSet{} defer set.AssertExpectations(t) - // Create three testing inputs. - // - // inputNotExist specifies an input that's not found in the sweeper's - // `pendingInputs` map. - inputNotExist := &input.MockInput{} - defer inputNotExist.AssertExpectations(t) - - inputNotExist.On("OutPoint").Return(wire.OutPoint{Index: 0}) - - // inputInit specifies a newly created input. - inputInit := &input.MockInput{} - defer inputInit.AssertExpectations(t) - - inputInit.On("OutPoint").Return(wire.OutPoint{Index: 1}) - - s.inputs[inputInit.OutPoint()] = &SweeperInput{ - state: Init, - } - - // inputPendingPublish specifies an input that's about to be published. - inputPendingPublish := &input.MockInput{} - defer inputPendingPublish.AssertExpectations(t) - - inputPendingPublish.On("OutPoint").Return(wire.OutPoint{Index: 2}) - - s.inputs[inputPendingPublish.OutPoint()] = &SweeperInput{ - state: PendingPublish, - } - - // inputTerminated specifies an input that's terminated. - inputTerminated := &input.MockInput{} - defer inputTerminated.AssertExpectations(t) - - inputTerminated.On("OutPoint").Return(wire.OutPoint{Index: 3}) - - s.inputs[inputTerminated.OutPoint()] = &SweeperInput{ - state: Excluded, - } + // Create three inputs with different states. + // - inputInit specifies a newly created input. + // - inputPendingPublish specifies an input about to be published. + // - inputTerminated specifies an input that's terminated. + var ( + inputInit = createMockInput(t, s, Init) + inputPendingPublish = createMockInput(t, s, PendingPublish) + inputTerminated = createMockInput(t, s, Excluded) + ) // Mark the test inputs. We expect the non-exist input and the // inputTerminated to be skipped, and the rest to be marked as pending // publish. set.On("Inputs").Return([]input.Input{ - inputNotExist, inputInit, inputPendingPublish, inputTerminated, + inputInit, inputPendingPublish, inputTerminated, }) s.markInputsPendingPublish(set) @@ -122,36 +130,22 @@ func TestMarkInputsPublished(t *testing.T) { dummyTR := &TxRecord{} dummyErr := errors.New("dummy error") + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + // Create a test sweeper. s := New(&UtxoSweeperConfig{ Store: mockStore, }) - // Create three testing inputs. - // - // inputNotExist specifies an input that's not found in the sweeper's - // `inputs` map. - inputNotExist := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 1}, - } - - // inputInit specifies a newly created input. When marking this as - // published, we should see an error log as this input hasn't been - // published yet. - inputInit := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 2}, - } - s.inputs[inputInit.PreviousOutPoint] = &SweeperInput{ - state: Init, - } - - // inputPendingPublish specifies an input that's about to be published. - inputPendingPublish := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 3}, - } - s.inputs[inputPendingPublish.PreviousOutPoint] = &SweeperInput{ - state: PendingPublish, - } + // Create two inputs with different states. + // - inputInit specifies a newly created input. + // - inputPendingPublish specifies an input about to be published. + var ( + inputInit = createMockInput(t, s, Init) + inputPendingPublish = createMockInput(t, s, PendingPublish) + ) // First, check that when an error is returned from db, it's properly // returned here. @@ -171,9 +165,9 @@ func TestMarkInputsPublished(t *testing.T) { // Mark the test inputs. We expect the non-exist input and the // inputInit to be skipped, and the final input to be marked as // published. - err = s.markInputsPublished(dummyTR, []*wire.TxIn{ - inputNotExist, inputInit, inputPendingPublish, - }) + set.On("Inputs").Return([]input.Input{inputInit, inputPendingPublish}) + + err = s.markInputsPublished(dummyTR, set) require.NoError(err) // We expect unchanged number of pending inputs. @@ -181,11 +175,11 @@ func TestMarkInputsPublished(t *testing.T) { // We expect the init input's state to stay unchanged. require.Equal(Init, - s.inputs[inputInit.PreviousOutPoint].state) + s.inputs[inputInit.OutPoint()].state) // We expect the pending-publish input's is now marked as published. require.Equal(Published, - s.inputs[inputPendingPublish.PreviousOutPoint].state) + s.inputs[inputPendingPublish.OutPoint()].state) // Assert mocked statements are executed as expected. mockStore.AssertExpectations(t) @@ -202,117 +196,75 @@ func TestMarkInputsPublishFailed(t *testing.T) { // Create a mock sweeper store. mockStore := NewMockSweeperStore() + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + // Create a test sweeper. s := New(&UtxoSweeperConfig{ Store: mockStore, }) - // Create testing inputs for each state. - // - // inputNotExist specifies an input that's not found in the sweeper's - // `inputs` map. - inputNotExist := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 1}, - } - - // inputInit specifies a newly created input. When marking this as - // published, we should see an error log as this input hasn't been - // published yet. - inputInit := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 2}, - } - s.inputs[inputInit.PreviousOutPoint] = &SweeperInput{ - state: Init, - } - - // inputPendingPublish specifies an input that's about to be published. - inputPendingPublish := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 3}, - } - s.inputs[inputPendingPublish.PreviousOutPoint] = &SweeperInput{ - state: PendingPublish, - } - - // inputPublished specifies an input that's published. - inputPublished := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 4}, - } - s.inputs[inputPublished.PreviousOutPoint] = &SweeperInput{ - state: Published, - } - - // inputPublishFailed specifies an input that's failed to be published. - inputPublishFailed := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 5}, - } - s.inputs[inputPublishFailed.PreviousOutPoint] = &SweeperInput{ - state: PublishFailed, - } - - // inputSwept specifies an input that's swept. - inputSwept := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 6}, - } - s.inputs[inputSwept.PreviousOutPoint] = &SweeperInput{ - state: Swept, - } - - // inputExcluded specifies an input that's excluded. - inputExcluded := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 7}, - } - s.inputs[inputExcluded.PreviousOutPoint] = &SweeperInput{ - state: Excluded, - } - - // inputFailed specifies an input that's failed. - inputFailed := &wire.TxIn{ - PreviousOutPoint: wire.OutPoint{Index: 8}, - } - s.inputs[inputFailed.PreviousOutPoint] = &SweeperInput{ - state: Failed, - } + // Create inputs with different states. + // - inputInit specifies a newly created input. When marking this as + // published, we should see an error log as this input hasn't been + // published yet. + // - inputPendingPublish specifies an input about to be published. + // - inputPublished specifies an input that's published. + // - inputPublishFailed specifies an input that's failed to be + // published. + // - inputSwept specifies an input that's swept. + // - inputExcluded specifies an input that's excluded. + // - inputFailed specifies an input that's failed. + var ( + inputInit = createMockInput(t, s, Init) + inputPendingPublish = createMockInput(t, s, PendingPublish) + inputPublished = createMockInput(t, s, Published) + inputPublishFailed = createMockInput(t, s, PublishFailed) + inputSwept = createMockInput(t, s, Swept) + inputExcluded = createMockInput(t, s, Excluded) + inputFailed = createMockInput(t, s, Failed) + ) - // Gather all inputs' outpoints. - pendingOps := make([]wire.OutPoint, 0, len(s.inputs)+1) - for op := range s.inputs { - pendingOps = append(pendingOps, op) - } - pendingOps = append(pendingOps, inputNotExist.PreviousOutPoint) + // Gather all inputs. + set.On("Inputs").Return([]input.Input{ + inputInit, inputPendingPublish, inputPublished, + inputPublishFailed, inputSwept, inputExcluded, inputFailed, + }) // Mark the test inputs. We expect the non-exist input and the // inputInit to be skipped, and the final input to be marked as // published. - s.markInputsPublishFailed(pendingOps) + s.markInputsPublishFailed(set) // We expect unchanged number of pending inputs. require.Len(s.inputs, 7) // We expect the init input's state to stay unchanged. require.Equal(Init, - s.inputs[inputInit.PreviousOutPoint].state) + s.inputs[inputInit.OutPoint()].state) // We expect the pending-publish input's is now marked as publish // failed. require.Equal(PublishFailed, - s.inputs[inputPendingPublish.PreviousOutPoint].state) + s.inputs[inputPendingPublish.OutPoint()].state) // We expect the published input's is now marked as publish failed. require.Equal(PublishFailed, - s.inputs[inputPublished.PreviousOutPoint].state) + s.inputs[inputPublished.OutPoint()].state) // We expect the publish failed input to stay unchanged. require.Equal(PublishFailed, - s.inputs[inputPublishFailed.PreviousOutPoint].state) + s.inputs[inputPublishFailed.OutPoint()].state) // We expect the swept input to stay unchanged. - require.Equal(Swept, s.inputs[inputSwept.PreviousOutPoint].state) + require.Equal(Swept, s.inputs[inputSwept.OutPoint()].state) // We expect the excluded input to stay unchanged. - require.Equal(Excluded, s.inputs[inputExcluded.PreviousOutPoint].state) + require.Equal(Excluded, s.inputs[inputExcluded.OutPoint()].state) // We expect the failed input to stay unchanged. - require.Equal(Failed, s.inputs[inputFailed.PreviousOutPoint].state) + require.Equal(Failed, s.inputs[inputFailed.OutPoint()].state) // Assert mocked statements are executed as expected. mockStore.AssertExpectations(t) @@ -738,33 +690,33 @@ func TestSweepPendingInputs(t *testing.T) { func TestHandleBumpEventTxFailed(t *testing.T) { t.Parallel() + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + // Create a test sweeper. s := New(&UtxoSweeperConfig{}) - var ( - // Create four testing outpoints. - op1 = wire.OutPoint{Hash: chainhash.Hash{1}} - op2 = wire.OutPoint{Hash: chainhash.Hash{2}} - op3 = wire.OutPoint{Hash: chainhash.Hash{3}} - opNotExist = wire.OutPoint{Hash: chainhash.Hash{4}} - ) + // inputNotExist specifies an input that's not found in the sweeper's + // `pendingInputs` map. + inputNotExist := &input.MockInput{} + defer inputNotExist.AssertExpectations(t) + inputNotExist.On("OutPoint").Return(wire.OutPoint{Index: 0}) + opNotExist := inputNotExist.OutPoint() // Create three mock inputs. - input1 := &input.MockInput{} - defer input1.AssertExpectations(t) - - input2 := &input.MockInput{} - defer input2.AssertExpectations(t) + var ( + input1 = createMockInput(t, s, PendingPublish) + input2 = createMockInput(t, s, PendingPublish) + input3 = createMockInput(t, s, PendingPublish) + ) - input3 := &input.MockInput{} - defer input3.AssertExpectations(t) + op1 := input1.OutPoint() + op2 := input2.OutPoint() + op3 := input3.OutPoint() // Construct the initial state for the sweeper. - s.inputs = InputsMap{ - op1: &SweeperInput{Input: input1, state: PendingPublish}, - op2: &SweeperInput{Input: input2, state: PendingPublish}, - op3: &SweeperInput{Input: input3, state: PendingPublish}, - } + set.On("Inputs").Return([]input.Input{input1, input2, input3}) // Create a testing tx that spends the first two inputs. tx := &wire.MsgTx{ @@ -782,16 +734,26 @@ func TestHandleBumpEventTxFailed(t *testing.T) { Err: errDummy, } + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + // Call the method under test. - err := s.handleBumpEvent(br) + err := s.handleBumpEvent(resp) require.ErrorIs(t, err, errDummy) // Assert the states of the first two inputs are updated. require.Equal(t, PublishFailed, s.inputs[op1].state) require.Equal(t, PublishFailed, s.inputs[op2].state) - // Assert the state of the third input is not updated. - require.Equal(t, PendingPublish, s.inputs[op3].state) + // Assert the state of the third input. + // + // NOTE: Although the tx doesn't spend it, we still mark this input as + // failed as we are treating the input set as the single source of + // truth. + require.Equal(t, PublishFailed, s.inputs[op3].state) // Assert the non-existing input is not added to the pending inputs. require.NotContains(t, s.inputs, opNotExist) @@ -810,23 +772,21 @@ func TestHandleBumpEventTxReplaced(t *testing.T) { wallet := &MockWallet{} defer wallet.AssertExpectations(t) + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + // Create a test sweeper. s := New(&UtxoSweeperConfig{ Store: store, Wallet: wallet, }) - // Create a testing outpoint. - op := wire.OutPoint{Hash: chainhash.Hash{1}} - // Create a mock input. - inp := &input.MockInput{} - defer inp.AssertExpectations(t) + inp := createMockInput(t, s, PendingPublish) + set.On("Inputs").Return([]input.Input{inp}) - // Construct the initial state for the sweeper. - s.inputs = InputsMap{ - op: &SweeperInput{Input: inp, state: PendingPublish}, - } + op := inp.OutPoint() // Create a testing tx that spends the input. tx := &wire.MsgTx{ @@ -851,12 +811,18 @@ func TestHandleBumpEventTxReplaced(t *testing.T) { Event: TxReplaced, } + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + // Mock the store to return an error. dummyErr := errors.New("dummy error") store.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once() // Call the method under test and assert the error is returned. - err := s.handleBumpEventTxReplaced(br) + err := s.handleBumpEventTxReplaced(resp) require.ErrorIs(t, err, dummyErr) // Mock the store to return the old tx record. @@ -871,7 +837,7 @@ func TestHandleBumpEventTxReplaced(t *testing.T) { store.On("DeleteTx", tx.TxHash()).Return(dummyErr).Once() // Call the method under test and assert the error is returned. - err = s.handleBumpEventTxReplaced(br) + err = s.handleBumpEventTxReplaced(resp) require.ErrorIs(t, err, dummyErr) // Mock the store to return the old tx record and delete it without @@ -891,7 +857,7 @@ func TestHandleBumpEventTxReplaced(t *testing.T) { wallet.On("CancelRebroadcast", tx.TxHash()).Once() // Call the method under test. - err = s.handleBumpEventTxReplaced(br) + err = s.handleBumpEventTxReplaced(resp) require.NoError(t, err) // Assert the state of the input is updated. @@ -907,22 +873,20 @@ func TestHandleBumpEventTxPublished(t *testing.T) { store := &MockSweeperStore{} defer store.AssertExpectations(t) + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + // Create a test sweeper. s := New(&UtxoSweeperConfig{ Store: store, }) - // Create a testing outpoint. - op := wire.OutPoint{Hash: chainhash.Hash{1}} - // Create a mock input. - inp := &input.MockInput{} - defer inp.AssertExpectations(t) + inp := createMockInput(t, s, PendingPublish) + set.On("Inputs").Return([]input.Input{inp}) - // Construct the initial state for the sweeper. - s.inputs = InputsMap{ - op: &SweeperInput{Input: inp, state: PendingPublish}, - } + op := inp.OutPoint() // Create a testing tx that spends the input. tx := &wire.MsgTx{ @@ -938,6 +902,12 @@ func TestHandleBumpEventTxPublished(t *testing.T) { Event: TxPublished, } + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + // Mock the store to save the new tx record. store.On("StoreTx", &TxRecord{ Txid: tx.TxHash(), @@ -945,7 +915,7 @@ func TestHandleBumpEventTxPublished(t *testing.T) { }).Return(nil).Once() // Call the method under test. - err := s.handleBumpEventTxPublished(br) + err := s.handleBumpEventTxPublished(resp) require.NoError(t, err) // Assert the state of the input is updated. @@ -963,25 +933,21 @@ func TestMonitorFeeBumpResult(t *testing.T) { wallet := &MockWallet{} defer wallet.AssertExpectations(t) + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + // Create a test sweeper. s := New(&UtxoSweeperConfig{ Store: store, Wallet: wallet, }) - // Create a testing outpoint. - op := wire.OutPoint{Hash: chainhash.Hash{1}} - // Create a mock input. - inp := &input.MockInput{} - defer inp.AssertExpectations(t) - - // Construct the initial state for the sweeper. - s.inputs = InputsMap{ - op: &SweeperInput{Input: inp, state: PendingPublish}, - } + inp := createMockInput(t, s, PendingPublish) // Create a testing tx that spends the input. + op := inp.OutPoint() tx := &wire.MsgTx{ LockTime: 1, TxIn: []*wire.TxIn{ @@ -1060,7 +1026,8 @@ func TestMonitorFeeBumpResult(t *testing.T) { return resultChan }, shouldExit: false, - }, { + }, + { // When the sweeper is shutting down, the monitor loop // should exit. name: "exit on sweeper shutdown", @@ -1087,7 +1054,7 @@ func TestMonitorFeeBumpResult(t *testing.T) { s.wg.Add(1) go func() { - s.monitorFeeBumpResult(resultChan) + s.monitorFeeBumpResult(set, resultChan) close(done) }() From a143223f70772dac082cd207db153e867e028739 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 25 Oct 2024 18:31:46 +0800 Subject: [PATCH 05/12] sweep: add `handleInitialBroadcast` to handle initial broadcast This commit adds a new method `handleInitialBroadcast` to handle the initial broadcast. Previously we'd broadcast immediately inside `Broadcast`, which soon will not work after the `blockbeat` is implemented as the action to publish is now always triggered by a new block. Meanwhile, we still keep the option to bypass the block trigger so users can broadcast immediately by setting `Immediate` to true. --- sweep/fee_bumper.go | 170 ++++++++++++++----- sweep/fee_bumper_test.go | 356 ++++++++++++++++++++++++++------------- 2 files changed, 366 insertions(+), 160 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 39b6fba2d29..645ecd6aa35 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -376,40 +376,52 @@ func (t *TxPublisher) isNeutrinoBackend() bool { return t.cfg.Wallet.BackEnd() == "neutrino" } -// Broadcast is used to publish the tx created from the given inputs. It will, -// 1. init a fee function based on the given strategy. -// 2. create an RBF-compliant tx and monitor it for confirmation. -// 3. notify the initial broadcast result back to the caller. -// The initial broadcast is guaranteed to be RBF-compliant unless the budget -// specified cannot cover the fee. +// Broadcast is used to publish the tx created from the given inputs. It will +// register the broadcast request and return a chan to the caller to subscribe +// the broadcast result. The initial broadcast is guaranteed to be +// RBF-compliant unless the budget specified cannot cover the fee. // // NOTE: part of the Bumper interface. func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) { log.Tracef("Received broadcast request: %s", lnutils.SpewLogClosure( req)) - // Attempt an initial broadcast which is guaranteed to comply with the - // RBF rules. - result, err := t.initialBroadcast(req) - if err != nil { - log.Errorf("Initial broadcast failed: %v", err) - - return nil, err - } + // Store the request. + requestID, record := t.storeInitialRecord(req) // Create a chan to send the result to the caller. subscriber := make(chan *BumpResult, 1) - t.subscriberChans.Store(result.requestID, subscriber) + t.subscriberChans.Store(requestID, subscriber) - // Send the initial broadcast result to the caller. - t.handleResult(result) + // Publish the tx immediately if specified. + if req.Immediate { + t.handleInitialBroadcast(record, requestID) + } return subscriber, nil } +// storeInitialRecord initializes a monitor record and saves it in the map. +func (t *TxPublisher) storeInitialRecord(req *BumpRequest) ( + uint64, *monitorRecord) { + + // Increase the request counter. + // + // NOTE: this is the only place where we increase the counter. + requestID := t.requestCounter.Add(1) + + // Register the record. + record := &monitorRecord{req: req} + t.records.Store(requestID, record) + + return requestID, record +} + // initialBroadcast initializes a fee function, creates an RBF-compliant tx and // broadcasts it. -func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) { +func (t *TxPublisher) initialBroadcast(requestID uint64, + req *BumpRequest) (*BumpResult, error) { + // Create a fee bumping algorithm to be used for future RBF. feeAlgo, err := t.initializeFeeFunction(req) if err != nil { @@ -418,7 +430,7 @@ func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) { // Create the initial tx to be broadcasted. This tx is guaranteed to // comply with the RBF restrictions. - requestID, err := t.createRBFCompliantTx(req, feeAlgo) + err = t.createRBFCompliantTx(requestID, req, feeAlgo) if err != nil { return nil, fmt.Errorf("create RBF-compliant tx: %w", err) } @@ -465,8 +477,8 @@ func (t *TxPublisher) initializeFeeFunction( // so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee // and redo the process until the tx is valid, or return an error when non-RBF // related errors occur or the budget has been used up. -func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest, - f FeeFunction) (uint64, error) { +func (t *TxPublisher) createRBFCompliantTx(requestID uint64, req *BumpRequest, + f FeeFunction) error { for { // Create a new tx with the given fee rate and check its @@ -475,17 +487,18 @@ func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest, switch { case err == nil: - // The tx is valid, return the request ID. - requestID := t.storeRecord( - sweepCtx.tx, req, f, sweepCtx.fee, + // The tx is valid, store it. + t.storeRecord( + requestID, sweepCtx.tx, req, f, sweepCtx.fee, ) - log.Infof("Created tx %v for %v inputs: feerate=%v, "+ - "fee=%v, inputs=%v", sweepCtx.tx.TxHash(), - len(req.Inputs), f.FeeRate(), sweepCtx.fee, + log.Infof("Created initial sweep tx=%v for %v inputs: "+ + "feerate=%v, fee=%v, inputs:\n%v", + sweepCtx.tx.TxHash(), len(req.Inputs), + f.FeeRate(), sweepCtx.fee, inputTypeSummary(req.Inputs)) - return requestID, nil + return nil // If the error indicates the fees paid is not enough, we will // ask the fee function to increase the fee rate and retry. @@ -516,7 +529,7 @@ func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest, // cluster these inputs differetly. increased, err = f.Increment() if err != nil { - return 0, err + return err } } @@ -526,20 +539,14 @@ func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest, // mempool acceptance. default: log.Debugf("Failed to create RBF-compliant tx: %v", err) - return 0, err + return err } } } // storeRecord stores the given record in the records map. -func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest, - f FeeFunction, fee btcutil.Amount) uint64 { - - // Increase the request counter. - // - // NOTE: this is the only place where we increase the - // counter. - requestID := t.requestCounter.Add(1) +func (t *TxPublisher) storeRecord(requestID uint64, tx *wire.MsgTx, + req *BumpRequest, f FeeFunction, fee btcutil.Amount) { // Register the record. t.records.Store(requestID, &monitorRecord{ @@ -548,8 +555,6 @@ func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest, feeFunction: f, fee: fee, }) - - return requestID } // createAndCheckTx creates a tx based on the given inputs, change output @@ -844,18 +849,27 @@ func (t *TxPublisher) processRecords() { // confirmed. confirmedRecords := make(map[uint64]*monitorRecord) - // feeBumpRecords stores a map of the records which need to be bumped. + // feeBumpRecords stores a map of records which need to be bumped. feeBumpRecords := make(map[uint64]*monitorRecord) - // failedRecords stores a map of the records which has inputs being - // spent by a third party. + // failedRecords stores a map of records which has inputs being spent + // by a third party. // // NOTE: this is only used for neutrino backend. failedRecords := make(map[uint64]*monitorRecord) + // initialRecords stores a map of records which are being created and + // published for the first time. + initialRecords := make(map[uint64]*monitorRecord) + // visitor is a helper closure that visits each record and divides them // into two groups. visitor := func(requestID uint64, r *monitorRecord) error { + if r.tx == nil { + initialRecords[requestID] = r + return nil + } + log.Tracef("Checking monitor recordID=%v for tx=%v", requestID, r.tx.TxHash()) @@ -883,9 +897,14 @@ func (t *TxPublisher) processRecords() { return nil } - // Iterate through all the records and divide them into two groups. + // Iterate through all the records and divide them into four groups. t.records.ForEach(visitor) + // Handle the initial broadcast. + for requestID, r := range initialRecords { + t.handleInitialBroadcast(r, requestID) + } + // For records that are confirmed, we'll notify the caller about this // result. for requestID, r := range confirmedRecords { @@ -941,6 +960,69 @@ func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) { t.handleResult(result) } +// handleInitialBroadcast is called when a new request is received. It will +// handle the initial tx creation and broadcast. In details, +// 1. init a fee function based on the given strategy. +// 2. create an RBF-compliant tx and monitor it for confirmation. +// 3. notify the initial broadcast result back to the caller. +func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord, + requestID uint64) { + + log.Debugf("Initial broadcast for requestID=%v", requestID) + + var ( + result *BumpResult + err error + ) + + // Attempt an initial broadcast which is guaranteed to comply with the + // RBF rules. + result, err = t.initialBroadcast(requestID, r.req) + if err != nil { + log.Errorf("Initial broadcast failed: %v", err) + + // We now decide what type of event to send. + var event BumpEvent + + switch { + // When the error is due to a dust output, we'll send a + // TxFailed so these inputs can be retried with a different + // group in the next block. + case errors.Is(err, ErrTxNoOutput): + event = TxFailed + + // When the error is due to budget being used up, we'll send a + // TxFailed so these inputs can be retried with a different + // group in the next block. + case errors.Is(err, ErrMaxPosition): + event = TxFailed + + // When the error is due to zero fee rate delta, we'll send a + // TxFailed so these inputs can be retried in the next block. + case errors.Is(err, ErrZeroFeeRateDelta): + event = TxFailed + + // Otherwise this is not a fee-related error and the tx cannot + // be retried. In that case we will fail ALL the inputs in this + // tx, which means they will be removed from the sweeper and + // never be tried again. + // + // TODO(yy): Find out which input is causing the failure and + // fail that one only. + default: + event = TxFatal + } + + result = &BumpResult{ + Event: event, + Err: err, + requestID: requestID, + } + } + + t.handleResult(result) +} + // handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will // attempt to bump the fee of the tx. // diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 807d1df595d..e179cf52e0e 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -344,13 +344,10 @@ func TestStoreRecord(t *testing.T) { initialCounter := tp.requestCounter.Load() // Call the method under test. - requestID := tp.storeRecord(tx, req, feeFunc, fee) - - // Check the request ID is as expected. - require.Equal(t, initialCounter+1, requestID) + tp.storeRecord(initialCounter, tx, req, feeFunc, fee) // Read the saved record and compare. - record, ok := tp.records.Load(requestID) + record, ok := tp.records.Load(initialCounter) require.True(t, ok) require.Equal(t, tx, record.tx) require.Equal(t, feeFunc, record.feeFunction) @@ -646,23 +643,19 @@ func TestCreateRBFCompliantTx(t *testing.T) { }, } + var requestCounter atomic.Uint64 for _, tc := range testCases { tc := tc + rid := requestCounter.Add(1) t.Run(tc.name, func(t *testing.T) { tc.setupMock() // Call the method under test. - id, err := tp.createRBFCompliantTx(req, m.feeFunc) + err := tp.createRBFCompliantTx(rid, req, m.feeFunc) // Check the result is as expected. require.ErrorIs(t, err, tc.expectedErr) - - // If there's an error, expect the requestID to be - // empty. - if tc.expectedErr != nil { - require.Zero(t, id) - } }) } } @@ -687,7 +680,8 @@ func TestTxPublisherBroadcast(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) - requestID := tp.storeRecord(tx, req, m.feeFunc, fee) + requestID := uint64(1) + tp.storeRecord(requestID, tx, req, m.feeFunc, fee) // Quickly check when the requestID cannot be found, an error is // returned. @@ -774,6 +768,9 @@ func TestRemoveResult(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) + // Create a test request ID counter. + requestCounter := atomic.Uint64{} + testCases := []struct { name string setupRecord func() uint64 @@ -785,10 +782,11 @@ func TestRemoveResult(t *testing.T) { // removed. name: "remove on TxConfirmed", setupRecord: func() uint64 { - id := tp.storeRecord(tx, req, m.feeFunc, fee) - tp.subscriberChans.Store(id, nil) + rid := requestCounter.Add(1) + tp.storeRecord(rid, tx, req, m.feeFunc, fee) + tp.subscriberChans.Store(rid, nil) - return id + return rid }, result: &BumpResult{ Event: TxConfirmed, @@ -800,10 +798,11 @@ func TestRemoveResult(t *testing.T) { // When the tx is failed, the records will be removed. name: "remove on TxFailed", setupRecord: func() uint64 { - id := tp.storeRecord(tx, req, m.feeFunc, fee) - tp.subscriberChans.Store(id, nil) + rid := requestCounter.Add(1) + tp.storeRecord(rid, tx, req, m.feeFunc, fee) + tp.subscriberChans.Store(rid, nil) - return id + return rid }, result: &BumpResult{ Event: TxFailed, @@ -816,10 +815,11 @@ func TestRemoveResult(t *testing.T) { // Noop when the tx is neither confirmed or failed. name: "noop when tx is not confirmed or failed", setupRecord: func() uint64 { - id := tp.storeRecord(tx, req, m.feeFunc, fee) - tp.subscriberChans.Store(id, nil) + rid := requestCounter.Add(1) + tp.storeRecord(rid, tx, req, m.feeFunc, fee) + tp.subscriberChans.Store(rid, nil) - return id + return rid }, result: &BumpResult{ Event: TxPublished, @@ -866,7 +866,8 @@ func TestNotifyResult(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) - requestID := tp.storeRecord(tx, req, m.feeFunc, fee) + requestID := uint64(1) + tp.storeRecord(requestID, tx, req, m.feeFunc, fee) // Create a subscription to the event. subscriber := make(chan *BumpResult, 1) @@ -914,41 +915,17 @@ func TestNotifyResult(t *testing.T) { } } -// TestBroadcastSuccess checks the public `Broadcast` method can successfully -// broadcast a tx based on the request. -func TestBroadcastSuccess(t *testing.T) { +// TestBroadcast checks the public `Broadcast` method can successfully register +// a broadcast request. +func TestBroadcast(t *testing.T) { t.Parallel() // Create a publisher using the mocks. - tp, m := createTestPublisher(t) + tp, _ := createTestPublisher(t) // Create a test feerate. feerate := chainfee.SatPerKWeight(1000) - // Mock the fee estimator to return the testing fee rate. - // - // We are not testing `NewLinearFeeFunction` here, so the actual params - // used are irrelevant. - m.estimator.On("EstimateFeePerKW", mock.Anything).Return( - feerate, nil).Once() - m.estimator.On("RelayFeePerKW").Return(chainfee.FeePerKwFloor).Once() - - // Mock the signer to always return a valid script. - // - // NOTE: we are not testing the utility of creating valid txes here, so - // this is fine to be mocked. This behaves essentially as skipping the - // Signer check and alaways assume the tx has a valid sig. - script := &input.Script{} - m.signer.On("ComputeInputScript", mock.Anything, - mock.Anything).Return(script, nil) - - // Mock the testmempoolaccept to pass. - m.wallet.On("CheckMempoolAcceptance", mock.Anything).Return(nil).Once() - - // Mock the wallet to publish successfully. - m.wallet.On("PublishTransaction", - mock.Anything, mock.Anything).Return(nil).Once() - // Create a test request. inp := createTestInput(1000, input.WitnessKeyHash) @@ -964,25 +941,23 @@ func TestBroadcastSuccess(t *testing.T) { // Send the req and expect no error. resultChan, err := tp.Broadcast(req) require.NoError(t, err) - - // Check the result is sent back. - select { - case <-time.After(time.Second): - t.Fatal("timeout waiting for subscriber to receive result") - - case result := <-resultChan: - // We expect the first result to be TxPublished. - require.Equal(t, TxPublished, result.Event) - } + require.NotNil(t, resultChan) // Validate the record was stored. require.Equal(t, 1, tp.records.Len()) require.Equal(t, 1, tp.subscriberChans.Len()) + + // Validate the record. + rid := tp.requestCounter.Load() + record, found := tp.records.Load(rid) + require.True(t, found) + require.Equal(t, req, record.req) } -// TestBroadcastFail checks the public `Broadcast` returns the error or a -// failed result when the broadcast fails. -func TestBroadcastFail(t *testing.T) { +// TestBroadcastImmediate checks the public `Broadcast` method can successfully +// register a broadcast request and publish the tx when `Immediate` flag is +// set. +func TestBroadcastImmediate(t *testing.T) { t.Parallel() // Create a publisher using the mocks. @@ -1001,64 +976,28 @@ func TestBroadcastFail(t *testing.T) { Budget: btcutil.Amount(1000), MaxFeeRate: feerate * 10, DeadlineHeight: 10, + Immediate: true, } - // Mock the fee estimator to return the testing fee rate. + // Mock the fee estimator to return an error. // - // We are not testing `NewLinearFeeFunction` here, so the actual params - // used are irrelevant. + // NOTE: We are not testing `handleInitialBroadcast` here, but only + // interested in checking that this method is indeed called when + // `Immediate` is true. Thus we mock the method to return an error to + // quickly abort. As long as this mocked method is called, we know the + // `Immediate` flag works. m.estimator.On("EstimateFeePerKW", mock.Anything).Return( - feerate, nil).Twice() - m.estimator.On("RelayFeePerKW").Return(chainfee.FeePerKwFloor).Twice() - - // Mock the signer to always return a valid script. - // - // NOTE: we are not testing the utility of creating valid txes here, so - // this is fine to be mocked. This behaves essentially as skipping the - // Signer check and alaways assume the tx has a valid sig. - script := &input.Script{} - m.signer.On("ComputeInputScript", mock.Anything, - mock.Anything).Return(script, nil) - - // Mock the testmempoolaccept to return an error. - m.wallet.On("CheckMempoolAcceptance", - mock.Anything).Return(errDummy).Once() + chainfee.SatPerKWeight(0), errDummy).Once() - // Send the req and expect an error returned. + // Send the req and expect no error. resultChan, err := tp.Broadcast(req) - require.ErrorIs(t, err, errDummy) - require.Nil(t, resultChan) - - // Validate the record was NOT stored. - require.Equal(t, 0, tp.records.Len()) - require.Equal(t, 0, tp.subscriberChans.Len()) - - // Mock the testmempoolaccept again, this time it passes. - m.wallet.On("CheckMempoolAcceptance", mock.Anything).Return(nil).Once() - - // Mock the wallet to fail on publish. - m.wallet.On("PublishTransaction", - mock.Anything, mock.Anything).Return(errDummy).Once() - - // Send the req and expect no error returned. - resultChan, err = tp.Broadcast(req) require.NoError(t, err) + require.NotNil(t, resultChan) - // Check the result is sent back. - select { - case <-time.After(time.Second): - t.Fatal("timeout waiting for subscriber to receive result") - - case result := <-resultChan: - // We expect the result to be TxFailed and the error is set in - // the result. - require.Equal(t, TxFailed, result.Event) - require.ErrorIs(t, result.Err, errDummy) - } - - // Validate the record was removed. - require.Equal(t, 0, tp.records.Len()) - require.Equal(t, 0, tp.subscriberChans.Len()) + // Validate the record was removed due to an error returned in initial + // broadcast. + require.Empty(t, tp.records.Len()) + require.Empty(t, tp.subscriberChans.Len()) } // TestCreateAnPublishFail checks all the error cases are handled properly in @@ -1223,7 +1162,8 @@ func TestHandleTxConfirmed(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) - requestID := tp.storeRecord(tx, req, m.feeFunc, fee) + requestID := uint64(1) + tp.storeRecord(requestID, tx, req, m.feeFunc, fee) record, ok := tp.records.Load(requestID) require.True(t, ok) @@ -1295,7 +1235,8 @@ func TestHandleFeeBumpTx(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) - requestID := tp.storeRecord(tx, req, m.feeFunc, fee) + requestID := uint64(1) + tp.storeRecord(requestID, tx, req, m.feeFunc, fee) // Create a subscription to the event. subscriber := make(chan *BumpResult, 1) @@ -1496,3 +1437,186 @@ func TestProcessRecords(t *testing.T) { require.Equal(t, requestID2, result.requestID) } } + +// TestHandleInitialBroadcastSuccess checks `handleInitialBroadcast` method can +// successfully broadcast a tx based on the request. +func TestHandleInitialBroadcastSuccess(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create a test feerate. + feerate := chainfee.SatPerKWeight(1000) + + // Mock the fee estimator to return the testing fee rate. + // + // We are not testing `NewLinearFeeFunction` here, so the actual params + // used are irrelevant. + m.estimator.On("EstimateFeePerKW", mock.Anything).Return( + feerate, nil).Once() + m.estimator.On("RelayFeePerKW").Return(chainfee.FeePerKwFloor).Once() + + // Mock the signer to always return a valid script. + // + // NOTE: we are not testing the utility of creating valid txes here, so + // this is fine to be mocked. This behaves essentially as skipping the + // Signer check and alaways assume the tx has a valid sig. + script := &input.Script{} + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(script, nil) + + // Mock the testmempoolaccept to pass. + m.wallet.On("CheckMempoolAcceptance", mock.Anything).Return(nil).Once() + + // Mock the wallet to publish successfully. + m.wallet.On("PublishTransaction", + mock.Anything, mock.Anything).Return(nil).Once() + + // Create a test request. + inp := createTestInput(1000, input.WitnessKeyHash) + + // Create a testing bump request. + req := &BumpRequest{ + DeliveryAddress: changePkScript, + Inputs: []input.Input{&inp}, + Budget: btcutil.Amount(1000), + MaxFeeRate: feerate * 10, + DeadlineHeight: 10, + } + + // Register the testing record use `Broadcast`. + resultChan, err := tp.Broadcast(req) + require.NoError(t, err) + + // Grab the monitor record from the map. + rid := tp.requestCounter.Load() + rec, ok := tp.records.Load(rid) + require.True(t, ok) + + // Call the method under test. + tp.wg.Add(1) + tp.handleInitialBroadcast(rec, rid) + + // Check the result is sent back. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber to receive result") + + case result := <-resultChan: + // We expect the first result to be TxPublished. + require.Equal(t, TxPublished, result.Event) + } + + // Validate the record was stored. + require.Equal(t, 1, tp.records.Len()) + require.Equal(t, 1, tp.subscriberChans.Len()) +} + +// TestHandleInitialBroadcastFail checks `handleInitialBroadcast` returns the +// error or a failed result when the broadcast fails. +func TestHandleInitialBroadcastFail(t *testing.T) { + t.Parallel() + + // Create a publisher using the mocks. + tp, m := createTestPublisher(t) + + // Create a test feerate. + feerate := chainfee.SatPerKWeight(1000) + + // Create a test request. + inp := createTestInput(1000, input.WitnessKeyHash) + + // Create a testing bump request. + req := &BumpRequest{ + DeliveryAddress: changePkScript, + Inputs: []input.Input{&inp}, + Budget: btcutil.Amount(1000), + MaxFeeRate: feerate * 10, + DeadlineHeight: 10, + } + + // Mock the fee estimator to return the testing fee rate. + // + // We are not testing `NewLinearFeeFunction` here, so the actual params + // used are irrelevant. + m.estimator.On("EstimateFeePerKW", mock.Anything).Return( + feerate, nil).Twice() + m.estimator.On("RelayFeePerKW").Return(chainfee.FeePerKwFloor).Twice() + + // Mock the signer to always return a valid script. + // + // NOTE: we are not testing the utility of creating valid txes here, so + // this is fine to be mocked. This behaves essentially as skipping the + // Signer check and alaways assume the tx has a valid sig. + script := &input.Script{} + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(script, nil) + + // Mock the testmempoolaccept to return an error. + m.wallet.On("CheckMempoolAcceptance", + mock.Anything).Return(errDummy).Once() + + // Register the testing record use `Broadcast`. + resultChan, err := tp.Broadcast(req) + require.NoError(t, err) + + // Grab the monitor record from the map. + rid := tp.requestCounter.Load() + rec, ok := tp.records.Load(rid) + require.True(t, ok) + + // Call the method under test and expect an error returned. + tp.wg.Add(1) + tp.handleInitialBroadcast(rec, rid) + + // Check the result is sent back. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber to receive result") + + case result := <-resultChan: + // We expect the first result to be TxFatal. + require.Equal(t, TxFatal, result.Event) + } + + // Validate the record was NOT stored. + require.Equal(t, 0, tp.records.Len()) + require.Equal(t, 0, tp.subscriberChans.Len()) + + // Mock the testmempoolaccept again, this time it passes. + m.wallet.On("CheckMempoolAcceptance", mock.Anything).Return(nil).Once() + + // Mock the wallet to fail on publish. + m.wallet.On("PublishTransaction", + mock.Anything, mock.Anything).Return(errDummy).Once() + + // Register the testing record use `Broadcast`. + resultChan, err = tp.Broadcast(req) + require.NoError(t, err) + + // Grab the monitor record from the map. + rid = tp.requestCounter.Load() + rec, ok = tp.records.Load(rid) + require.True(t, ok) + + // Call the method under test. + tp.wg.Add(1) + tp.handleInitialBroadcast(rec, rid) + + // Check the result is sent back. + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for subscriber to receive result") + + case result := <-resultChan: + // We expect the result to be TxFailed and the error is set in + // the result. + require.Equal(t, TxFailed, result.Event) + require.ErrorIs(t, result.Err, errDummy) + } + + // Validate the record was removed. + require.Equal(t, 0, tp.records.Len()) + require.Equal(t, 0, tp.subscriberChans.Len()) +} From e1fe1531a81726369dceed23f16e8f87345f53d3 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 30 Apr 2024 17:39:45 +0800 Subject: [PATCH 06/12] sweep: remove redundant error from `Broadcast` --- sweep/fee_bumper.go | 10 +++++----- sweep/fee_bumper_test.go | 15 +++++---------- sweep/mock_test.go | 6 +++--- sweep/sweeper.go | 11 +---------- sweep/sweeper_test.go | 9 ++------- 5 files changed, 16 insertions(+), 35 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 645ecd6aa35..17748184ac5 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -65,7 +65,7 @@ type Bumper interface { // and monitors its confirmation status for potential fee bumping. It // returns a chan that the caller can use to receive updates about the // broadcast result and potential RBF attempts. - Broadcast(req *BumpRequest) (<-chan *BumpResult, error) + Broadcast(req *BumpRequest) <-chan *BumpResult } // BumpEvent represents the event of a fee bumping attempt. @@ -382,9 +382,9 @@ func (t *TxPublisher) isNeutrinoBackend() bool { // RBF-compliant unless the budget specified cannot cover the fee. // // NOTE: part of the Bumper interface. -func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) { - log.Tracef("Received broadcast request: %s", lnutils.SpewLogClosure( - req)) +func (t *TxPublisher) Broadcast(req *BumpRequest) <-chan *BumpResult { + log.Tracef("Received broadcast request: %s", + lnutils.SpewLogClosure(req)) // Store the request. requestID, record := t.storeInitialRecord(req) @@ -398,7 +398,7 @@ func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) { t.handleInitialBroadcast(record, requestID) } - return subscriber, nil + return subscriber } // storeInitialRecord initializes a monitor record and saves it in the map. diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index e179cf52e0e..76a2a17d11d 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -939,8 +939,7 @@ func TestBroadcast(t *testing.T) { } // Send the req and expect no error. - resultChan, err := tp.Broadcast(req) - require.NoError(t, err) + resultChan := tp.Broadcast(req) require.NotNil(t, resultChan) // Validate the record was stored. @@ -990,8 +989,7 @@ func TestBroadcastImmediate(t *testing.T) { chainfee.SatPerKWeight(0), errDummy).Once() // Send the req and expect no error. - resultChan, err := tp.Broadcast(req) - require.NoError(t, err) + resultChan := tp.Broadcast(req) require.NotNil(t, resultChan) // Validate the record was removed due to an error returned in initial @@ -1486,8 +1484,7 @@ func TestHandleInitialBroadcastSuccess(t *testing.T) { } // Register the testing record use `Broadcast`. - resultChan, err := tp.Broadcast(req) - require.NoError(t, err) + resultChan := tp.Broadcast(req) // Grab the monitor record from the map. rid := tp.requestCounter.Load() @@ -1558,8 +1555,7 @@ func TestHandleInitialBroadcastFail(t *testing.T) { mock.Anything).Return(errDummy).Once() // Register the testing record use `Broadcast`. - resultChan, err := tp.Broadcast(req) - require.NoError(t, err) + resultChan := tp.Broadcast(req) // Grab the monitor record from the map. rid := tp.requestCounter.Load() @@ -1592,8 +1588,7 @@ func TestHandleInitialBroadcastFail(t *testing.T) { mock.Anything, mock.Anything).Return(errDummy).Once() // Register the testing record use `Broadcast`. - resultChan, err = tp.Broadcast(req) - require.NoError(t, err) + resultChan = tp.Broadcast(req) // Grab the monitor record from the map. rid = tp.requestCounter.Load() diff --git a/sweep/mock_test.go b/sweep/mock_test.go index 3179ed8837a..f65be46f7f4 100644 --- a/sweep/mock_test.go +++ b/sweep/mock_test.go @@ -284,14 +284,14 @@ type MockBumper struct { var _ Bumper = (*MockBumper)(nil) // Broadcast broadcasts the transaction to the network. -func (m *MockBumper) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) { +func (m *MockBumper) Broadcast(req *BumpRequest) <-chan *BumpResult { args := m.Called(req) if args.Get(0) == nil { - return nil, args.Error(1) + return nil } - return args.Get(0).(chan *BumpResult), args.Error(1) + return args.Get(0).(chan *BumpResult) } // MockFeeFunction is a mock implementation of the FeeFunction interface. diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 586b76cf5b4..2ddcaee1b8f 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -838,16 +838,7 @@ func (s *UtxoSweeper) sweep(set InputSet) error { // Broadcast will return a read-only chan that we will listen to for // this publish result and future RBF attempt. - resp, err := s.cfg.Publisher.Broadcast(req) - if err != nil { - log.Errorf("Initial broadcast failed: %v, inputs=\n%v", err, - inputTypeSummary(set.Inputs())) - - // TODO(yy): find out which input is causing the failure. - s.markInputsPublishFailed(set) - - return err - } + resp := s.cfg.Publisher.Broadcast(req) // Successfully sent the broadcast attempt, we now handle the result by // subscribing to the result chan and listen for future updates about diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 54bc0c21762..4c7eea0d318 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -673,13 +673,8 @@ func TestSweepPendingInputs(t *testing.T) { setNeedWallet, normalSet, }) - // Mock `Broadcast` to return an error. This should cause the - // `createSweepTx` inside `sweep` to fail. This is done so we can - // terminate the method early as we are only interested in testing the - // workflow in `sweepPendingInputs`. We don't need to test `sweep` here - // as it should be tested in its own unit test. - dummyErr := errors.New("dummy error") - publisher.On("Broadcast", mock.Anything).Return(nil, dummyErr).Twice() + // Mock `Broadcast` to return a result. + publisher.On("Broadcast", mock.Anything).Return(nil).Twice() // Call the method under test. s.sweepPendingInputs(pis) From 7a8be07fe8090db2e64e05cdf4a3666e385d1aa8 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 30 Apr 2024 19:25:06 +0800 Subject: [PATCH 07/12] sweep: add method `handleBumpEventError` and fix `markInputFailed` Previously in `markInputFailed`, we'd remove all inputs under the same group via `removeExclusiveGroup`. This is wrong as when the current sweep fails for this input, it shouldn't affect other inputs. --- sweep/sweeper.go | 65 +++++++++++++++++++--- sweep/sweeper_test.go | 122 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 6 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 2ddcaee1b8f..175e70558f0 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1441,11 +1441,6 @@ func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) { pi.state = Failed - // Remove all other inputs in this exclusive group. - if pi.params.ExclusiveGroup != nil { - s.removeExclusiveGroup(*pi.params.ExclusiveGroup) - } - s.signalResult(pi, Result{Err: err}) } @@ -1728,6 +1723,62 @@ func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error { return nil } +// handleBumpEventTxFatal handles the case where there's an unexpected error +// when creating or publishing the sweeping tx. In this case, the tx will be +// removed from the sweeper store and the inputs will be marked as `Failed`, +// which means they will not be retried. +func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error { + r := resp.result + + // Remove the tx from the sweeper store if there is one. Since this is + // a broadcast error, it's likely there isn't a tx here. + if r.Tx != nil { + txid := r.Tx.TxHash() + log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err) + + // Remove the tx from the sweeper db if it exists. + if err := s.cfg.Store.DeleteTx(txid); err != nil { + return fmt.Errorf("delete tx record for %v: %w", txid, + err) + } + } + + // Mark the inputs as failed. + s.markInputsFailed(resp.set, r.Err) + + return nil +} + +// markInputsFailed marks all inputs found in the tx as failed. It will also +// notify all the subscribers of these inputs. +func (s *UtxoSweeper) markInputsFailed(set InputSet, err error) { + for _, inp := range set.Inputs() { + outpoint := inp.OutPoint() + + input, ok := s.inputs[outpoint] + if !ok { + // It's very likely that a spending tx contains inputs + // that we don't know. + log.Tracef("Skipped marking input as failed: %v not "+ + "found in pending inputs", outpoint) + + continue + } + + // If the input is already in a terminal state, we don't want + // to rewrite it, which also indicates an error as we only get + // an error event during the initial broadcast. + if input.terminated() { + log.Errorf("Skipped marking input=%v as failed due to "+ + "unexpected state=%v", outpoint, input.state) + + continue + } + + s.markInputFailed(input, err) + } +} + // handleBumpEvent handles the result sent from the bumper based on its event // type. // @@ -1752,8 +1803,10 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error { case TxReplaced: return s.handleBumpEventTxReplaced(r) + // There's a fatal error in creating the tx, we will remove the tx from + // the sweeper db and mark the inputs as failed. case TxFatal: - // TODO(yy): create a method to remove this input. + return s.handleBumpEventTxFatal(r) } return nil diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 4c7eea0d318..741deefd3a2 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1075,3 +1075,125 @@ func TestMonitorFeeBumpResult(t *testing.T) { }) } } + +// TestMarkInputsFailed checks that given a list of inputs with different +// states, the method `markInputsFailed` correctly marks the inputs as failed. +func TestMarkInputsFailed(t *testing.T) { + t.Parallel() + + require := require.New(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{}) + + // Create testing inputs for each state. + // - inputInit specifies a newly created input. When marking this as + // published, we should see an error log as this input hasn't been + // published yet. + // - inputPendingPublish specifies an input about to be published. + // - inputPublished specifies an input that's published. + // - inputPublishFailed specifies an input that's failed to be + // published. + // - inputSwept specifies an input that's swept. + // - inputExcluded specifies an input that's excluded. + // - inputFailed specifies an input that's failed. + var ( + inputInit = createMockInput(t, s, Init) + inputPendingPublish = createMockInput(t, s, PendingPublish) + inputPublished = createMockInput(t, s, Published) + inputPublishFailed = createMockInput(t, s, PublishFailed) + inputSwept = createMockInput(t, s, Swept) + inputExcluded = createMockInput(t, s, Excluded) + inputFailed = createMockInput(t, s, Failed) + ) + + // Gather all inputs. + set.On("Inputs").Return([]input.Input{ + inputInit, inputPendingPublish, inputPublished, + inputPublishFailed, inputSwept, inputExcluded, inputFailed, + }) + + // Mark the test inputs. We expect the non-exist input and + // inputSwept/inputExcluded/inputFailed to be skipped. + s.markInputsFailed(set, errDummy) + + // We expect unchanged number of pending inputs. + require.Len(s.inputs, 7) + + // We expect the init input's to be marked as failed. + require.Equal(Failed, s.inputs[inputInit.OutPoint()].state) + + // We expect the pending-publish input to be marked as failed. + require.Equal(Failed, s.inputs[inputPendingPublish.OutPoint()].state) + + // We expect the published input to be marked as failed. + require.Equal(Failed, s.inputs[inputPublished.OutPoint()].state) + + // We expect the publish failed input to be markd as failed. + require.Equal(Failed, s.inputs[inputPublishFailed.OutPoint()].state) + + // We expect the swept input to stay unchanged. + require.Equal(Swept, s.inputs[inputSwept.OutPoint()].state) + + // We expect the excluded input to stay unchanged. + require.Equal(Excluded, s.inputs[inputExcluded.OutPoint()].state) + + // We expect the failed input to stay unchanged. + require.Equal(Failed, s.inputs[inputFailed.OutPoint()].state) +} + +// TestHandleBumpEventTxFatal checks that `handleBumpEventTxFatal` correctly +// handles a `TxFatal` event. +func TestHandleBumpEventTxFatal(t *testing.T) { + t.Parallel() + + rt := require.New(t) + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. We are not testing `markInputFailed` here, + // so the actual set doesn't matter. + set := &MockInputSet{} + defer set.AssertExpectations(t) + set.On("Inputs").Return(nil) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a dummy tx. + tx := &wire.MsgTx{ + LockTime: 1, + } + + // Create a testing bump response. + result := &BumpResult{ + Err: errDummy, + Tx: tx, + } + resp := &bumpResp{ + result: result, + set: set, + } + + // Mock the store to return an error. + store.On("DeleteTx", mock.Anything).Return(errDummy).Once() + + // Call the method under test and assert the error is returned. + err := s.handleBumpEventTxFatal(resp) + rt.ErrorIs(err, errDummy) + + // Mock the store to return nil. + store.On("DeleteTx", mock.Anything).Return(nil).Once() + + // Call the method under test and assert no error is returned. + err = s.handleBumpEventTxFatal(resp) + rt.NoError(err) +} From 24abb947bd81592dc736aea57e6d15651cc2afba Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 30 Apr 2024 19:28:01 +0800 Subject: [PATCH 08/12] sweep: add method `isMature` on `SweeperInput` Also updated `handlePendingSweepsReq` to skip immature inputs so the returned results are the same as those in pre-0.18.0. --- sweep/sweeper.go | 51 +++++++++++++++++++++++++++++++------------ sweep/sweeper_test.go | 2 ++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 175e70558f0..6abbf789904 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -222,6 +222,34 @@ func (p *SweeperInput) terminated() bool { } } +// isMature returns a boolean indicating whether the input has a timelock that +// has been reached or not. The locktime found is also returned. +func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) { + locktime, _ := p.RequiredLockTime() + if currentHeight < locktime { + log.Debugf("Input %v has locktime=%v, current height is %v", + p.OutPoint(), locktime, currentHeight) + + return false, locktime + } + + // If the input has a CSV that's not yet reached, we will skip + // this input and wait for the expiry. + // + // NOTE: We need to consider whether this input can be included in the + // next block or not, which means the CSV will be checked against the + // currentHeight plus one. + locktime = p.BlocksToMaturity() + p.HeightHint() + if currentHeight+1 < locktime { + log.Debugf("Input %v has CSV expiry=%v, current height is %v", + p.OutPoint(), locktime, currentHeight) + + return false, locktime + } + + return true, locktime +} + // InputsMap is a type alias for a set of pending inputs. type InputsMap = map[wire.OutPoint]*SweeperInput @@ -1038,6 +1066,12 @@ func (s *UtxoSweeper) handlePendingSweepsReq( resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs)) for _, inp := range s.inputs { + // Skip immature inputs for compatibility. + mature, _ := inp.isMature(uint32(s.currentHeight)) + if !mature { + continue + } + // Only the exported fields are set, as we expect the response // to only be consumed externally. op := inp.OutPoint() @@ -1485,20 +1519,9 @@ func (s *UtxoSweeper) updateSweeperInputs() InputsMap { // If the input has a locktime that's not yet reached, we will // skip this input and wait for the locktime to be reached. - locktime, _ := input.RequiredLockTime() - if uint32(s.currentHeight) < locktime { - log.Warnf("Skipping input %v due to locktime=%v not "+ - "reached, current height is %v", op, locktime, - s.currentHeight) - - continue - } - - // If the input has a CSV that's not yet reached, we will skip - // this input and wait for the expiry. - locktime = input.BlocksToMaturity() + input.HeightHint() - if s.currentHeight < int32(locktime)-1 { - log.Infof("Skipping input %v due to CSV expiry=%v not "+ + mature, locktime := input.isMature(uint32(s.currentHeight)) + if !mature { + log.Infof("Skipping input %v due to locktime=%v not "+ "reached, current height is %v", op, locktime, s.currentHeight) diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 741deefd3a2..381676826e8 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -443,6 +443,7 @@ func TestUpdateSweeperInputs(t *testing.T) { // returned. inp2.On("RequiredLockTime").Return( uint32(s.currentHeight+1), true).Once() + inp2.On("OutPoint").Return(wire.OutPoint{Index: 2}).Maybe() input7 := &SweeperInput{state: Init, Input: inp2} // Mock the input to have a CSV expiry in the future so it will NOT be @@ -451,6 +452,7 @@ func TestUpdateSweeperInputs(t *testing.T) { uint32(s.currentHeight), false).Once() inp3.On("BlocksToMaturity").Return(uint32(2)).Once() inp3.On("HeightHint").Return(uint32(s.currentHeight)).Once() + inp3.On("OutPoint").Return(wire.OutPoint{Index: 3}).Maybe() input8 := &SweeperInput{state: Init, Input: inp3} // Add the inputs to the sweeper. After the update, we should see the From b7360e746fb0bc55726787b5a9d77c8bf71413da Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 1 May 2024 02:58:27 +0800 Subject: [PATCH 09/12] sweep: make sure defaultDeadline is derived from the mature height --- sweep/sweeper.go | 54 +++++++++++++++++++++++++++++++++++-------- sweep/sweeper_test.go | 2 +- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 6abbf789904..de611eba04f 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -530,7 +530,7 @@ func (s *UtxoSweeper) SweepInput(inp input.Input, } absoluteTimeLock, _ := inp.RequiredLockTime() - log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+ + log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+ "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+ "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(), inp.BlocksToMaturity(), absoluteTimeLock, @@ -736,7 +736,18 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { inputs := s.updateSweeperInputs() log.Debugf("Received new block: height=%v, attempt "+ - "sweeping %d inputs", epoch.Height, len(inputs)) + "sweeping %d inputs:\n%s", + epoch.Height, len(inputs), + lnutils.NewLogClosure(func() string { + inps := make( + []input.Input, 0, len(inputs), + ) + for _, in := range inputs { + inps = append(inps, in) + } + + return inputTypeSummary(inps) + })) // Attempt to sweep any pending inputs. s.sweepPendingInputs(inputs) @@ -1207,13 +1218,29 @@ func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] { return s.cfg.Mempool.LookupInputMempoolSpend(op) } -// handleNewInput processes a new input by registering spend notification and -// scheduling sweeping for it. -func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { +// calculateDefaultDeadline calculates the default deadline height for a sweep +// request that has no deadline height specified. +func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 { // Create a default deadline height, which will be used when there's no // DeadlineHeight specified for a given input. defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget) + // If the input is immature and has a locktime, we'll use the locktime + // height as the starting height. + matured, locktime := pi.isMature(uint32(s.currentHeight)) + if !matured { + defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget) + log.Debugf("Input %v is immature, using locktime=%v instead "+ + "of current height=%d", pi.OutPoint(), locktime, + s.currentHeight) + } + + return defaultDeadline +} + +// handleNewInput processes a new input by registering spend notification and +// scheduling sweeping for it. +func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { outpoint := input.input.OutPoint() pi, pending := s.inputs[outpoint] if pending { @@ -1238,15 +1265,22 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { Input: input.input, params: input.params, rbf: rbfInfo, - // Set the acutal deadline height. - DeadlineHeight: input.params.DeadlineHeight.UnwrapOr( - defaultDeadline, - ), } + // Set the acutal deadline height. + pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr( + s.calculateDefaultDeadline(pi), + ) + s.inputs[outpoint] = pi log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state) + log.Infof("Registered sweep request at block %d: out_point=%v, "+ + "witness_type=%v, amount=%v, deadline=%d, params=(%v)", + s.currentHeight, pi.OutPoint(), pi.WitnessType(), + btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight, + pi.params) + // Start watching for spend of this input, either by us or the remote // party. cancel, err := s.monitorSpend( @@ -1660,7 +1694,7 @@ func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) { r := resp.result tx, err := r.Tx, r.Err - log.Errorf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(), err) + log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(), err) // NOTE: When marking the inputs as failed, we are using the input set // instead of the inputs found in the tx. This is fine for current diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 381676826e8..415c8240cec 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -739,7 +739,7 @@ func TestHandleBumpEventTxFailed(t *testing.T) { // Call the method under test. err := s.handleBumpEvent(resp) - require.ErrorIs(t, err, errDummy) + require.NoError(t, err) // Assert the states of the first two inputs are updated. require.Equal(t, PublishFailed, s.inputs[op1].state) From fd9c0019aefa5d952b18e1fd1926673e95995db3 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 25 Oct 2024 15:17:41 +0800 Subject: [PATCH 10/12] sweep: remove redundant loopvar assign --- sweep/fee_bumper.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 17748184ac5..2494865ec81 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -908,11 +908,9 @@ func (t *TxPublisher) processRecords() { // For records that are confirmed, we'll notify the caller about this // result. for requestID, r := range confirmedRecords { - rec := r - log.Debugf("Tx=%v is confirmed", r.tx.TxHash()) t.wg.Add(1) - go t.handleTxConfirmed(rec, requestID) + go t.handleTxConfirmed(r, requestID) } // Get the current height to be used in the following goroutines. @@ -920,22 +918,18 @@ func (t *TxPublisher) processRecords() { // For records that are not confirmed, we perform a fee bump if needed. for requestID, r := range feeBumpRecords { - rec := r - log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash()) t.wg.Add(1) - go t.handleFeeBumpTx(requestID, rec, currentHeight) + go t.handleFeeBumpTx(requestID, r, currentHeight) } // For records that are failed, we'll notify the caller about this // result. for requestID, r := range failedRecords { - rec := r - log.Debugf("Tx=%v has inputs been spent by a third party, "+ "failing it now", r.tx.TxHash()) t.wg.Add(1) - go t.handleThirdPartySpent(rec, requestID) + go t.handleThirdPartySpent(r, requestID) } } From 184c1911867d95d484e04a2fcb5b6226386fceee Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 7 Nov 2024 15:07:26 +0800 Subject: [PATCH 11/12] sweep: break `initialBroadcast` into two steps With the combination of the following commit we can have a more granular control over the bump result when handling it in the sweeper. --- sweep/fee_bumper.go | 111 ++++++++++++++++++++++++++------------------ 1 file changed, 65 insertions(+), 46 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 2494865ec81..a6e669554e8 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -417,31 +417,23 @@ func (t *TxPublisher) storeInitialRecord(req *BumpRequest) ( return requestID, record } -// initialBroadcast initializes a fee function, creates an RBF-compliant tx and -// broadcasts it. -func (t *TxPublisher) initialBroadcast(requestID uint64, - req *BumpRequest) (*BumpResult, error) { - +// initializeTx initializes a fee function and creates an RBF-compliant tx. If +// succeeded, the initial tx is stored in the records map. +func (t *TxPublisher) initializeTx(requestID uint64, req *BumpRequest) error { // Create a fee bumping algorithm to be used for future RBF. feeAlgo, err := t.initializeFeeFunction(req) if err != nil { - return nil, fmt.Errorf("init fee function: %w", err) + return fmt.Errorf("init fee function: %w", err) } // Create the initial tx to be broadcasted. This tx is guaranteed to // comply with the RBF restrictions. err = t.createRBFCompliantTx(requestID, req, feeAlgo) if err != nil { - return nil, fmt.Errorf("create RBF-compliant tx: %w", err) - } - - // Broadcast the tx and return the monitored record. - result, err := t.broadcast(requestID) - if err != nil { - return nil, fmt.Errorf("broadcast sweep tx: %w", err) + return fmt.Errorf("create RBF-compliant tx: %w", err) } - return result, nil + return nil } // initializeFeeFunction initializes a fee function to be used for this request @@ -954,6 +946,50 @@ func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) { t.handleResult(result) } +// handleInitialTxError takes the error from `initializeTx` and decides the +// bump event. It will construct a BumpResult and handles it. +func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) { + // We now decide what type of event to send. + var event BumpEvent + + switch { + // When the error is due to a dust output, we'll send a TxFailed so + // these inputs can be retried with a different group in the next + // block. + case errors.Is(err, ErrTxNoOutput): + event = TxFailed + + // When the error is due to budget being used up, we'll send a TxFailed + // so these inputs can be retried with a different group in the next + // block. + case errors.Is(err, ErrMaxPosition): + event = TxFailed + + // When the error is due to zero fee rate delta, we'll send a TxFailed + // so these inputs can be retried in the next block. + case errors.Is(err, ErrZeroFeeRateDelta): + event = TxFailed + + // Otherwise this is not a fee-related error and the tx cannot be + // retried. In that case we will fail ALL the inputs in this tx, which + // means they will be removed from the sweeper and never be tried + // again. + // + // TODO(yy): Find out which input is causing the failure and fail that + // one only. + default: + event = TxFatal + } + + result := &BumpResult{ + Event: event, + Err: err, + requestID: requestID, + } + + t.handleResult(result) +} + // handleInitialBroadcast is called when a new request is received. It will // handle the initial tx creation and broadcast. In details, // 1. init a fee function based on the given strategy. @@ -971,44 +1007,27 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord, // Attempt an initial broadcast which is guaranteed to comply with the // RBF rules. - result, err = t.initialBroadcast(requestID, r.req) + // + // Create the initial tx to be broadcasted. + err = t.initializeTx(requestID, r.req) if err != nil { log.Errorf("Initial broadcast failed: %v", err) - // We now decide what type of event to send. - var event BumpEvent + // We now handle the initialization error and exit. + t.handleInitialTxError(requestID, err) - switch { - // When the error is due to a dust output, we'll send a - // TxFailed so these inputs can be retried with a different - // group in the next block. - case errors.Is(err, ErrTxNoOutput): - event = TxFailed - - // When the error is due to budget being used up, we'll send a - // TxFailed so these inputs can be retried with a different - // group in the next block. - case errors.Is(err, ErrMaxPosition): - event = TxFailed - - // When the error is due to zero fee rate delta, we'll send a - // TxFailed so these inputs can be retried in the next block. - case errors.Is(err, ErrZeroFeeRateDelta): - event = TxFailed - - // Otherwise this is not a fee-related error and the tx cannot - // be retried. In that case we will fail ALL the inputs in this - // tx, which means they will be removed from the sweeper and - // never be tried again. - // - // TODO(yy): Find out which input is causing the failure and - // fail that one only. - default: - event = TxFatal - } + return + } + // Successfully created the first tx, now broadcast it. + result, err = t.broadcast(requestID) + if err != nil { + // The broadcast failed, which can only happen if the tx record + // cannot be found or the aux sweeper returns an error. In + // either case, we will send back a TxFail event so these + // inputs can be retried. result = &BumpResult{ - Event: event, + Event: TxFailed, Err: err, requestID: requestID, } From 9333ba468449ead3b16d9a2d8528fde147920171 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 7 Nov 2024 20:28:50 +0800 Subject: [PATCH 12/12] sweep: make sure nil tx is handled After previous commit, it should be clear that the tx may be failed to created in a `TxFailed` event. We now make sure to catch it to avoid panic. --- sweep/fee_bumper.go | 22 ++++++++++++++++------ sweep/fee_bumper_test.go | 14 +++++++------- sweep/sweeper.go | 13 ++++++++++++- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index a6e669554e8..2348992f184 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -75,7 +75,17 @@ const ( // TxPublished is sent when the broadcast attempt is finished. TxPublished BumpEvent = iota - // TxFailed is sent when the broadcast attempt fails. + // TxFailed is sent when the tx has encountered a fee-related error + // during its creation or broadcast, or an internal error from the fee + // bumper. In either case the inputs in this tx should be retried with + // either a different grouping strategy or an increased budget. + // + // NOTE: We also send this event when there's a third party spend + // event, and the sweeper will handle cleaning this up once it's + // confirmed. + // + // TODO(yy): Remove the above usage once we remove sweeping non-CPFP + // anchors. TxFailed // TxReplaced is sent when the original tx is replaced by a new one. @@ -269,8 +279,10 @@ func (b *BumpResult) String() string { // Validate validates the BumpResult so it's safe to use. func (b *BumpResult) Validate() error { + isFailureEvent := b.Event == TxFailed || b.Event == TxFatal + // Every result must have a tx except the fatal or failed case. - if b.Tx == nil && b.Event != TxFatal { + if b.Tx == nil && !isFailureEvent { return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult) } @@ -285,10 +297,8 @@ func (b *BumpResult) Validate() error { } // If it's a failed or fatal event, it must have an error. - if b.Event == TxFatal || b.Event == TxFailed { - if b.Err == nil { - return fmt.Errorf("%w: nil error", ErrInvalidBumpResult) - } + if isFailureEvent && b.Err == nil { + return fmt.Errorf("%w: nil error", ErrInvalidBumpResult) } // If it's a confirmed event, it must have a fee rate and fee. diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 76a2a17d11d..72b2da34ee3 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -91,13 +91,6 @@ func TestBumpResultValidate(t *testing.T) { } require.ErrorIs(t, b.Validate(), ErrInvalidBumpResult) - // A failed event without a tx will give an error. - b = BumpResult{ - Event: TxFailed, - Err: errDummy, - } - require.ErrorIs(t, b.Validate(), ErrInvalidBumpResult) - // A fatal event without a failure reason will give an error. b = BumpResult{ Event: TxFailed, @@ -118,6 +111,13 @@ func TestBumpResultValidate(t *testing.T) { } require.NoError(t, b.Validate()) + // Tx is allowed to be nil in a TxFailed event. + b = BumpResult{ + Event: TxFailed, + Err: errDummy, + } + require.NoError(t, b.Validate()) + // Tx is allowed to be nil in a TxFatal event. b = BumpResult{ Event: TxFatal, diff --git a/sweep/sweeper.go b/sweep/sweeper.go index de611eba04f..16fb81dedbd 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1669,6 +1669,14 @@ func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet, // in sweeper and rely solely on this event to mark // inputs as Swept? if r.Event == TxConfirmed || r.Event == TxFailed { + // Exit if the tx is failed to be created. + if r.Tx == nil { + log.Debugf("Received %v for nil tx, "+ + "exit monitor", r.Event) + + return + } + log.Debugf("Received %v for sweep tx %v, exit "+ "fee bump monitor", r.Event, r.Tx.TxHash()) @@ -1694,7 +1702,10 @@ func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) { r := resp.result tx, err := r.Tx, r.Err - log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(), err) + if tx != nil { + log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(), + err) + } // NOTE: When marking the inputs as failed, we are using the input set // instead of the inputs found in the tx. This is fine for current