From ec98074cbd2f1dee75f892d45e1080f86b9ca2d9 Mon Sep 17 00:00:00 2001 From: Slyghtning Date: Wed, 10 Jan 2024 14:06:01 +0100 Subject: [PATCH 1/4] funding: add timeout to channel batch funding --- funding/batch.go | 54 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/funding/batch.go b/funding/batch.go index fc050cdf53e..223ac1d081e 100644 --- a/funding/batch.go +++ b/funding/batch.go @@ -7,6 +7,7 @@ import ( "encoding/base64" "errors" "fmt" + "time" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/btcutil/psbt" @@ -20,6 +21,14 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + // DefaultBatchFundingTimeout is the default timeout for a batch channel + // funding operation. If the whole operation takes longer than this + // timeout, it will be aborted. During the timeout period the user can + // send Ctrl+C to the process to abort the funding process manually. + DefaultBatchFundingTimeout = 60 * time.Second +) + var ( // errShuttingDown is the error that is returned if a signal on the // quit channel is received which means the whole server is shutting @@ -293,7 +302,13 @@ func (b *Batcher) BatchFund(ctx context.Context, // channel funding negotiation with the peers. Because we specified a // PSBT assembler, we'll get a special response in the channel once the // funding output script is known (which we need to craft the TX). - eg := &errgroup.Group{} + // In case one or more of the peers fail to respond within a timeout + // period, we'll abort the whole process. During the timeout period, the + // user can abort the funding process manually by sending Ctrl+C to the + // process which called the rpc command. + + // Kick off the channel funding negotiation with each of the peers. + eg, egCtx := errgroup.WithContext(ctx) for _, channel := range b.channels { channel.updateChan, channel.errChan = b.cfg.ChannelOpener( channel.fundingReq, @@ -303,7 +318,7 @@ func (b *Batcher) BatchFund(ctx context.Context, // either the update or error chan. channel := channel eg.Go(func() error { - return b.waitForUpdate(channel, true) + return b.waitForUpdate(egCtx, channel, true) }) } @@ -411,13 +426,13 @@ func (b *Batcher) BatchFund(ctx context.Context, // Now every channel should be ready for the funding transaction to be // broadcast. Let's wait for the updates that actually confirm this // state. - eg = &errgroup.Group{} + eg, egCtx = errgroup.WithContext(ctx) for _, channel := range b.channels { // Launch another goroutine that waits for the channel pending // response on the update chan. channel := channel eg.Go(func() error { - return b.waitForUpdate(channel, false) + return b.waitForUpdate(egCtx, channel, false) }) } @@ -447,17 +462,27 @@ func (b *Batcher) BatchFund(ctx context.Context, } // waitForUpdate waits for an incoming channel update (or error) for a single -// channel. +// channel. If no updates are received within the DefaultBatchFundingTimeout +// period, the method is aborted and an error is returned. // // NOTE: Must be called in a goroutine as this blocks until an update or error // is received. -func (b *Batcher) waitForUpdate(channel *batchChannel, firstUpdate bool) error { +func (b *Batcher) waitForUpdate(ctx context.Context, channel *batchChannel, + firstUpdate bool) error { + + // Create the timeout context. + deadlineCtx, cancel := context.WithTimeout( + ctx, DefaultBatchFundingTimeout, + ) + defer cancel() + + targetPubKey := channel.fundingReq.TargetPubkey.SerializeCompressed() + select { // If an error occurs then immediately return the error to the client. case err := <-channel.errChan: log.Errorf("unable to open channel to NodeKey(%x): %v", - channel.fundingReq.TargetPubkey.SerializeCompressed(), - err) + targetPubKey, err) return err // Otherwise, wait for the next channel update. The first update sent @@ -478,6 +503,19 @@ func (b *Batcher) waitForUpdate(channel *batchChannel, firstUpdate bool) error { case <-b.cfg.Quit: return errShuttingDown + + case <-deadlineCtx.Done(): + // If we haven't received any updates within the timeout period, + // we'll return an error indicating the peer that failed to + // respond in time. + log.Infof("Cancelled batch channel opening for peer %x due to "+ + "timeout. The peer didn't provide any funding updates"+ + " within the timeout period.", targetPubKey) + + return fmt.Errorf("cancelled batch channel opening for peer "+ + "%x due to timeout. The peer didn't provide any "+ + "funding updates within the timeout period, %w", + targetPubKey, deadlineCtx.Err()) } } From 215516d78a4d17c26904c2e741dcafb62b5ef4a2 Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 10 Nov 2024 15:48:55 +0100 Subject: [PATCH 2/4] funding: fix logs output. --- funding/manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/funding/manager.go b/funding/manager.go index 1fa90c6932b..de37cf7f9b1 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -2268,7 +2268,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, peerKey := resCtx.peer.IdentityKey() failFlow := func(errMsg string, cause error) { log.Errorf("Unable to handle funding accept message "+ - "for peer_key=%x, pending_chan_id=%x: %s: %v", + "for peer_key=%x, pending_chan_id=%v: %s: %v", peerKey.SerializeCompressed(), cid.tempChanID, errMsg, cause) f.failFundingFlow(resCtx.peer, cid, cause) @@ -2344,7 +2344,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, // survive a restart as it's in memory only. case <-f.quit: log.Errorf("Unable to handle funding accept message "+ - "for peer_key=%x, pending_chan_id=%x: funding manager "+ + "for peer_key=%x, pending_chan_id=%v: funding manager "+ "shutting down", peerKey.SerializeCompressed(), cid.tempChanID) return From 25e5885584aa2c924510e408068fc39355d86807 Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 10 Nov 2024 22:17:56 +0100 Subject: [PATCH 3/4] lncli: add cancelPsbtFlow command When the user aborts the psbt channel funding flow, the user normally has to wait up to 10 min for the other node to cancel the lingering funding attempt. This cmd provides the possiblity to cancel a failed funding flow faster hence signaling to the peer that he can also remove the attempt from his mappings. --- cmd/commands/cmd_open_channel.go | 81 ++++++++++++++++++++++++++++++++ cmd/commands/main.go | 1 + 2 files changed, 82 insertions(+) diff --git a/cmd/commands/cmd_open_channel.go b/cmd/commands/cmd_open_channel.go index b4fe83f20b1..10954c4d4fc 100644 --- a/cmd/commands/cmd_open_channel.go +++ b/cmd/commands/cmd_open_channel.go @@ -918,6 +918,87 @@ func batchOpenChannel(ctx *cli.Context) error { return nil } +var cancelPsbtCommand = cli.Command{ + Name: "cancelPsbtFlow", + Category: "Channels", + Usage: "Cancels a specific PSBT funding flow identified by its " + + "pending channel ID", + Description: ` +When opening a channel with the PSBT funding flow, the user can cancel the +funding flow at any time by sending an interrupt signal (Ctrl+C). This +will lead to the funding intent lingering around for us and our peer. Moreover +this prevents us from trying to open another channel because the peer has +likely a limit of pending channels in place. +This command allows the user to cancel a specific PSBT funding flow. This will +send also an error msg to the peer for this particular channel which will +trigger the peer to delete the pending channel intent. +`, + ArgsUsage: "pending_chan_id", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "pending_chan_id", + Usage: "the corresponding pending channel ID of the " + + "PSBT funding flow to cancel", + }, + }, + Action: actionDecorator(cancelPsbtFlow), +} + +// cancelPsbtFlow cancels a PSBT funding flow when aborted by the user. +func cancelPsbtFlow(ctx *cli.Context) error { + ctxc := getContext() + args := ctx.Args() + + var pendingChanID []byte + switch { + case ctx.IsSet("pending_chan_id"): + chanID, err := hex.DecodeString(ctx.String("pending_chan_id")) + if err != nil { + return fmt.Errorf("unable to decode pending chan id: "+ + "%v", err) + } + pendingChanID = chanID + + case args.Present(): + chanID, err := hex.DecodeString(args.First()) + if err != nil { + return fmt.Errorf("unable to decode pending chan id: "+ + "%v", err) + } + args = args.Tail() + pendingChanID = chanID + default: + return fmt.Errorf("pending_chan_id argument missing") + } + + if len(pendingChanID) != 32 { + return fmt.Errorf("invalid pending channel ID length: "+ + "got %d, want 32", len(pendingChanID)) + } + + cancelMsg := &lnrpc.FundingTransitionMsg{ + Trigger: &lnrpc.FundingTransitionMsg_ShimCancel{ + ShimCancel: &lnrpc.FundingShimCancel{ + PendingChanId: pendingChanID, + }, + }, + } + + err := sendFundingState(ctxc, ctx, cancelMsg) + if err != nil { + return err + } + + printJSON(struct { + Status string `json:"status"` + }{ + Status: fmt.Sprintf("successfully canceled psbt opening: "+ + "pendingChannelID(%x)", pendingChanID), + }) + + return nil +} + // printChanOpen prints the channel point of the channel open message. func printChanOpen(update *lnrpc.OpenStatusUpdate_ChanOpen) error { channelPoint := update.ChanOpen.ChannelPoint diff --git a/cmd/commands/main.go b/cmd/commands/main.go index 71a8531d9af..db4c04dfc5c 100644 --- a/cmd/commands/main.go +++ b/cmd/commands/main.go @@ -459,6 +459,7 @@ func Main() { disconnectCommand, openChannelCommand, batchOpenChannelCommand, + cancelPsbtCommand, closeChannelCommand, closeAllChannelsCommand, abandonChannelCommand, From fc6facdafa7caf0244dbb87aaa631c352ce4396c Mon Sep 17 00:00:00 2001 From: ziggie Date: Mon, 11 Nov 2024 11:34:51 +0100 Subject: [PATCH 4/4] funding: fix unit and itests. --- funding/batch.go | 7 +++---- funding/batch_test.go | 33 +++++++++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/funding/batch.go b/funding/batch.go index 223ac1d081e..464ced83abd 100644 --- a/funding/batch.go +++ b/funding/batch.go @@ -572,10 +572,9 @@ func (b *Batcher) cleanup(ctx context.Context) { // And finally clean up the funding shim for each channel that didn't // make it into a pending state. for _, channel := range b.channels { - if channel.isPending { - continue - } - + // We need to cancel all funding intents that we created even + // though the channel might not be in the pending state. The + // peer might still be waiting for us to send an error msg. err := b.cfg.Wallet.CancelFundingIntent(channel.pendingChanID) if err != nil { log.Debugf(errMsgTpl, "cancel funding shim", err) diff --git a/funding/batch_test.go b/funding/batch_test.go index 9ca522b0301..f4e39b3e3b5 100644 --- a/funding/batch_test.go +++ b/funding/batch_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "sync" "testing" "github.com/btcsuite/btcd/btcec/v2" @@ -60,6 +61,10 @@ type testHarness struct { pendingTx *wire.MsgTx txPublished bool + + // mu protects the harness state because we do the funding process + // concurrently. + mu sync.RWMutex } func newTestHarness(t *testing.T, failUpdate1, failUpdate2, @@ -125,6 +130,9 @@ func (h *testHarness) parseRequest( func (h *testHarness) openChannel( req *InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) { + h.mu.Lock() + defer h.mu.Unlock() + updateChan := make(chan *lnrpc.OpenStatusUpdate, 2) errChan := make(chan error, 1) @@ -164,6 +172,9 @@ func (h *testHarness) openChannel( } func (h *testHarness) abandonChannel(op *wire.OutPoint) error { + h.mu.Lock() + defer h.mu.Unlock() + h.abandonedChannels[*op] = struct{}{} return nil @@ -172,6 +183,9 @@ func (h *testHarness) abandonChannel(op *wire.OutPoint) error { func (h *testHarness) FundPsbt(context.Context, *walletrpc.FundPsbtRequest) (*walletrpc.FundPsbtResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + packet, err := psbt.NewFromUnsignedTx(h.pendingTx) if err != nil { return nil, err @@ -199,6 +213,9 @@ func (h *testHarness) FinalizePsbt(context.Context, *walletrpc.FinalizePsbtRequest) (*walletrpc.FinalizePsbtResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + var psbtBuf bytes.Buffer if err := h.pendingPacket.Serialize(&psbtBuf); err != nil { return nil, err @@ -219,6 +236,9 @@ func (h *testHarness) ReleaseOutput(_ context.Context, r *walletrpc.ReleaseOutputRequest) (*walletrpc.ReleaseOutputResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + hash, err := chainhash.NewHash(r.Outpoint.TxidBytes) if err != nil { return nil, err @@ -240,6 +260,9 @@ func (h *testHarness) PsbtFundingVerify([32]byte, *psbt.Packet, bool) error { func (h *testHarness) PsbtFundingFinalize(pid [32]byte, _ *psbt.Packet, _ *wire.MsgTx) error { + h.mu.Lock() + defer h.mu.Unlock() + // During the finalize phase we can now prepare the next update to send. // For this we first need to find the intent that has the channels we // need to send on. @@ -271,6 +294,9 @@ func (h *testHarness) PsbtFundingFinalize(pid [32]byte, _ *psbt.Packet, } func (h *testHarness) PublishTransaction(*wire.MsgTx, string) error { + h.mu.Lock() + defer h.mu.Unlock() + if h.failPublish { return errFundingFailed } @@ -281,6 +307,9 @@ func (h *testHarness) PublishTransaction(*wire.MsgTx, string) error { } func (h *testHarness) CancelFundingIntent(pid [32]byte) error { + h.mu.Lock() + defer h.mu.Unlock() + h.intentsCanceled[pid] = struct{}{} return nil @@ -388,7 +417,7 @@ func TestBatchFund(t *testing.T) { // If we fail on update 2 we do so on the second // channel so one will be pending and one not // yet. - require.Len(t, h.intentsCanceled, 1) + require.Len(t, h.intentsCanceled, 2) require.Len(t, h.abandonedChannels, 1) require.Contains( t, h.abandonedChannels, wire.OutPoint{ @@ -402,7 +431,7 @@ func TestBatchFund(t *testing.T) { require.Len(t, h.releasedUTXOs, 1) require.Len(t, h.intentsCreated, 2) - require.Len(t, h.intentsCanceled, 0) + require.Len(t, h.intentsCanceled, 2) require.Len(t, h.abandonedChannels, 2) require.Contains( t, h.abandonedChannels, wire.OutPoint{