diff --git a/cmd/commands/cmd_open_channel.go b/cmd/commands/cmd_open_channel.go index b4fe83f20b1..10954c4d4fc 100644 --- a/cmd/commands/cmd_open_channel.go +++ b/cmd/commands/cmd_open_channel.go @@ -918,6 +918,87 @@ func batchOpenChannel(ctx *cli.Context) error { return nil } +var cancelPsbtCommand = cli.Command{ + Name: "cancelPsbtFlow", + Category: "Channels", + Usage: "Cancels a specific PSBT funding flow identified by its " + + "pending channel ID", + Description: ` +When opening a channel with the PSBT funding flow, the user can cancel the +funding flow at any time by sending an interrupt signal (Ctrl+C). This +will lead to the funding intent lingering around for us and our peer. Moreover +this prevents us from trying to open another channel because the peer has +likely a limit of pending channels in place. +This command allows the user to cancel a specific PSBT funding flow. This will +send also an error msg to the peer for this particular channel which will +trigger the peer to delete the pending channel intent. +`, + ArgsUsage: "pending_chan_id", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "pending_chan_id", + Usage: "the corresponding pending channel ID of the " + + "PSBT funding flow to cancel", + }, + }, + Action: actionDecorator(cancelPsbtFlow), +} + +// cancelPsbtFlow cancels a PSBT funding flow when aborted by the user. +func cancelPsbtFlow(ctx *cli.Context) error { + ctxc := getContext() + args := ctx.Args() + + var pendingChanID []byte + switch { + case ctx.IsSet("pending_chan_id"): + chanID, err := hex.DecodeString(ctx.String("pending_chan_id")) + if err != nil { + return fmt.Errorf("unable to decode pending chan id: "+ + "%v", err) + } + pendingChanID = chanID + + case args.Present(): + chanID, err := hex.DecodeString(args.First()) + if err != nil { + return fmt.Errorf("unable to decode pending chan id: "+ + "%v", err) + } + args = args.Tail() + pendingChanID = chanID + default: + return fmt.Errorf("pending_chan_id argument missing") + } + + if len(pendingChanID) != 32 { + return fmt.Errorf("invalid pending channel ID length: "+ + "got %d, want 32", len(pendingChanID)) + } + + cancelMsg := &lnrpc.FundingTransitionMsg{ + Trigger: &lnrpc.FundingTransitionMsg_ShimCancel{ + ShimCancel: &lnrpc.FundingShimCancel{ + PendingChanId: pendingChanID, + }, + }, + } + + err := sendFundingState(ctxc, ctx, cancelMsg) + if err != nil { + return err + } + + printJSON(struct { + Status string `json:"status"` + }{ + Status: fmt.Sprintf("successfully canceled psbt opening: "+ + "pendingChannelID(%x)", pendingChanID), + }) + + return nil +} + // printChanOpen prints the channel point of the channel open message. func printChanOpen(update *lnrpc.OpenStatusUpdate_ChanOpen) error { channelPoint := update.ChanOpen.ChannelPoint diff --git a/cmd/commands/main.go b/cmd/commands/main.go index 71a8531d9af..db4c04dfc5c 100644 --- a/cmd/commands/main.go +++ b/cmd/commands/main.go @@ -459,6 +459,7 @@ func Main() { disconnectCommand, openChannelCommand, batchOpenChannelCommand, + cancelPsbtCommand, closeChannelCommand, closeAllChannelsCommand, abandonChannelCommand, diff --git a/funding/batch.go b/funding/batch.go index fc050cdf53e..464ced83abd 100644 --- a/funding/batch.go +++ b/funding/batch.go @@ -7,6 +7,7 @@ import ( "encoding/base64" "errors" "fmt" + "time" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/btcutil/psbt" @@ -20,6 +21,14 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + // DefaultBatchFundingTimeout is the default timeout for a batch channel + // funding operation. If the whole operation takes longer than this + // timeout, it will be aborted. During the timeout period the user can + // send Ctrl+C to the process to abort the funding process manually. + DefaultBatchFundingTimeout = 60 * time.Second +) + var ( // errShuttingDown is the error that is returned if a signal on the // quit channel is received which means the whole server is shutting @@ -293,7 +302,13 @@ func (b *Batcher) BatchFund(ctx context.Context, // channel funding negotiation with the peers. Because we specified a // PSBT assembler, we'll get a special response in the channel once the // funding output script is known (which we need to craft the TX). - eg := &errgroup.Group{} + // In case one or more of the peers fail to respond within a timeout + // period, we'll abort the whole process. During the timeout period, the + // user can abort the funding process manually by sending Ctrl+C to the + // process which called the rpc command. + + // Kick off the channel funding negotiation with each of the peers. + eg, egCtx := errgroup.WithContext(ctx) for _, channel := range b.channels { channel.updateChan, channel.errChan = b.cfg.ChannelOpener( channel.fundingReq, @@ -303,7 +318,7 @@ func (b *Batcher) BatchFund(ctx context.Context, // either the update or error chan. channel := channel eg.Go(func() error { - return b.waitForUpdate(channel, true) + return b.waitForUpdate(egCtx, channel, true) }) } @@ -411,13 +426,13 @@ func (b *Batcher) BatchFund(ctx context.Context, // Now every channel should be ready for the funding transaction to be // broadcast. Let's wait for the updates that actually confirm this // state. - eg = &errgroup.Group{} + eg, egCtx = errgroup.WithContext(ctx) for _, channel := range b.channels { // Launch another goroutine that waits for the channel pending // response on the update chan. channel := channel eg.Go(func() error { - return b.waitForUpdate(channel, false) + return b.waitForUpdate(egCtx, channel, false) }) } @@ -447,17 +462,27 @@ func (b *Batcher) BatchFund(ctx context.Context, } // waitForUpdate waits for an incoming channel update (or error) for a single -// channel. +// channel. If no updates are received within the DefaultBatchFundingTimeout +// period, the method is aborted and an error is returned. // // NOTE: Must be called in a goroutine as this blocks until an update or error // is received. -func (b *Batcher) waitForUpdate(channel *batchChannel, firstUpdate bool) error { +func (b *Batcher) waitForUpdate(ctx context.Context, channel *batchChannel, + firstUpdate bool) error { + + // Create the timeout context. + deadlineCtx, cancel := context.WithTimeout( + ctx, DefaultBatchFundingTimeout, + ) + defer cancel() + + targetPubKey := channel.fundingReq.TargetPubkey.SerializeCompressed() + select { // If an error occurs then immediately return the error to the client. case err := <-channel.errChan: log.Errorf("unable to open channel to NodeKey(%x): %v", - channel.fundingReq.TargetPubkey.SerializeCompressed(), - err) + targetPubKey, err) return err // Otherwise, wait for the next channel update. The first update sent @@ -478,6 +503,19 @@ func (b *Batcher) waitForUpdate(channel *batchChannel, firstUpdate bool) error { case <-b.cfg.Quit: return errShuttingDown + + case <-deadlineCtx.Done(): + // If we haven't received any updates within the timeout period, + // we'll return an error indicating the peer that failed to + // respond in time. + log.Infof("Cancelled batch channel opening for peer %x due to "+ + "timeout. The peer didn't provide any funding updates"+ + " within the timeout period.", targetPubKey) + + return fmt.Errorf("cancelled batch channel opening for peer "+ + "%x due to timeout. The peer didn't provide any "+ + "funding updates within the timeout period, %w", + targetPubKey, deadlineCtx.Err()) } } @@ -534,10 +572,9 @@ func (b *Batcher) cleanup(ctx context.Context) { // And finally clean up the funding shim for each channel that didn't // make it into a pending state. for _, channel := range b.channels { - if channel.isPending { - continue - } - + // We need to cancel all funding intents that we created even + // though the channel might not be in the pending state. The + // peer might still be waiting for us to send an error msg. err := b.cfg.Wallet.CancelFundingIntent(channel.pendingChanID) if err != nil { log.Debugf(errMsgTpl, "cancel funding shim", err) diff --git a/funding/batch_test.go b/funding/batch_test.go index 9ca522b0301..f4e39b3e3b5 100644 --- a/funding/batch_test.go +++ b/funding/batch_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "sync" "testing" "github.com/btcsuite/btcd/btcec/v2" @@ -60,6 +61,10 @@ type testHarness struct { pendingTx *wire.MsgTx txPublished bool + + // mu protects the harness state because we do the funding process + // concurrently. + mu sync.RWMutex } func newTestHarness(t *testing.T, failUpdate1, failUpdate2, @@ -125,6 +130,9 @@ func (h *testHarness) parseRequest( func (h *testHarness) openChannel( req *InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) { + h.mu.Lock() + defer h.mu.Unlock() + updateChan := make(chan *lnrpc.OpenStatusUpdate, 2) errChan := make(chan error, 1) @@ -164,6 +172,9 @@ func (h *testHarness) openChannel( } func (h *testHarness) abandonChannel(op *wire.OutPoint) error { + h.mu.Lock() + defer h.mu.Unlock() + h.abandonedChannels[*op] = struct{}{} return nil @@ -172,6 +183,9 @@ func (h *testHarness) abandonChannel(op *wire.OutPoint) error { func (h *testHarness) FundPsbt(context.Context, *walletrpc.FundPsbtRequest) (*walletrpc.FundPsbtResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + packet, err := psbt.NewFromUnsignedTx(h.pendingTx) if err != nil { return nil, err @@ -199,6 +213,9 @@ func (h *testHarness) FinalizePsbt(context.Context, *walletrpc.FinalizePsbtRequest) (*walletrpc.FinalizePsbtResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + var psbtBuf bytes.Buffer if err := h.pendingPacket.Serialize(&psbtBuf); err != nil { return nil, err @@ -219,6 +236,9 @@ func (h *testHarness) ReleaseOutput(_ context.Context, r *walletrpc.ReleaseOutputRequest) (*walletrpc.ReleaseOutputResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + hash, err := chainhash.NewHash(r.Outpoint.TxidBytes) if err != nil { return nil, err @@ -240,6 +260,9 @@ func (h *testHarness) PsbtFundingVerify([32]byte, *psbt.Packet, bool) error { func (h *testHarness) PsbtFundingFinalize(pid [32]byte, _ *psbt.Packet, _ *wire.MsgTx) error { + h.mu.Lock() + defer h.mu.Unlock() + // During the finalize phase we can now prepare the next update to send. // For this we first need to find the intent that has the channels we // need to send on. @@ -271,6 +294,9 @@ func (h *testHarness) PsbtFundingFinalize(pid [32]byte, _ *psbt.Packet, } func (h *testHarness) PublishTransaction(*wire.MsgTx, string) error { + h.mu.Lock() + defer h.mu.Unlock() + if h.failPublish { return errFundingFailed } @@ -281,6 +307,9 @@ func (h *testHarness) PublishTransaction(*wire.MsgTx, string) error { } func (h *testHarness) CancelFundingIntent(pid [32]byte) error { + h.mu.Lock() + defer h.mu.Unlock() + h.intentsCanceled[pid] = struct{}{} return nil @@ -388,7 +417,7 @@ func TestBatchFund(t *testing.T) { // If we fail on update 2 we do so on the second // channel so one will be pending and one not // yet. - require.Len(t, h.intentsCanceled, 1) + require.Len(t, h.intentsCanceled, 2) require.Len(t, h.abandonedChannels, 1) require.Contains( t, h.abandonedChannels, wire.OutPoint{ @@ -402,7 +431,7 @@ func TestBatchFund(t *testing.T) { require.Len(t, h.releasedUTXOs, 1) require.Len(t, h.intentsCreated, 2) - require.Len(t, h.intentsCanceled, 0) + require.Len(t, h.intentsCanceled, 2) require.Len(t, h.abandonedChannels, 2) require.Contains( t, h.abandonedChannels, wire.OutPoint{ diff --git a/funding/manager.go b/funding/manager.go index 1fa90c6932b..de37cf7f9b1 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -2268,7 +2268,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, peerKey := resCtx.peer.IdentityKey() failFlow := func(errMsg string, cause error) { log.Errorf("Unable to handle funding accept message "+ - "for peer_key=%x, pending_chan_id=%x: %s: %v", + "for peer_key=%x, pending_chan_id=%v: %s: %v", peerKey.SerializeCompressed(), cid.tempChanID, errMsg, cause) f.failFundingFlow(resCtx.peer, cid, cause) @@ -2344,7 +2344,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, // survive a restart as it's in memory only. case <-f.quit: log.Errorf("Unable to handle funding accept message "+ - "for peer_key=%x, pending_chan_id=%x: funding manager "+ + "for peer_key=%x, pending_chan_id=%v: funding manager "+ "shutting down", peerKey.SerializeCompressed(), cid.tempChanID) return