Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions cmd/commands/cmd_open_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,87 @@ func batchOpenChannel(ctx *cli.Context) error {
return nil
}

var cancelPsbtCommand = cli.Command{
Name: "cancelPsbtFlow",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we normally don't use camel case for command names.

Category: "Channels",
Usage: "Cancels a specific PSBT funding flow identified by its " +
"pending channel ID",
Description: `
When opening a channel with the PSBT funding flow, the user can cancel the
funding flow at any time by sending an interrupt signal (Ctrl+C). This
will lead to the funding intent lingering around for us and our peer. Moreover
this prevents us from trying to open another channel because the peer has
likely a limit of pending channels in place.
This command allows the user to cancel a specific PSBT funding flow. This will
send also an error msg to the peer for this particular channel which will
trigger the peer to delete the pending channel intent.
`,
ArgsUsage: "pending_chan_id",
Flags: []cli.Flag{
cli.StringFlag{
Name: "pending_chan_id",
Usage: "the corresponding pending channel ID of the " +
"PSBT funding flow to cancel",
},
},
Action: actionDecorator(cancelPsbtFlow),
}

// cancelPsbtFlow cancels a PSBT funding flow when aborted by the user.
func cancelPsbtFlow(ctx *cli.Context) error {
ctxc := getContext()
args := ctx.Args()

var pendingChanID []byte
switch {
case ctx.IsSet("pending_chan_id"):
chanID, err := hex.DecodeString(ctx.String("pending_chan_id"))
if err != nil {
return fmt.Errorf("unable to decode pending chan id: "+
"%v", err)
}
pendingChanID = chanID

case args.Present():
chanID, err := hex.DecodeString(args.First())
if err != nil {
return fmt.Errorf("unable to decode pending chan id: "+
"%v", err)
}
args = args.Tail()
pendingChanID = chanID
default:
return fmt.Errorf("pending_chan_id argument missing")
}

if len(pendingChanID) != 32 {
return fmt.Errorf("invalid pending channel ID length: "+
"got %d, want 32", len(pendingChanID))
}

cancelMsg := &lnrpc.FundingTransitionMsg{
Trigger: &lnrpc.FundingTransitionMsg_ShimCancel{
ShimCancel: &lnrpc.FundingShimCancel{
PendingChanId: pendingChanID,
},
},
}

err := sendFundingState(ctxc, ctx, cancelMsg)
if err != nil {
return err
}

printJSON(struct {
Status string `json:"status"`
}{
Status: fmt.Sprintf("successfully canceled psbt opening: "+
"pendingChannelID(%x)", pendingChanID),
})

return nil
}

// printChanOpen prints the channel point of the channel open message.
func printChanOpen(update *lnrpc.OpenStatusUpdate_ChanOpen) error {
channelPoint := update.ChanOpen.ChannelPoint
Expand Down
1 change: 1 addition & 0 deletions cmd/commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func Main() {
disconnectCommand,
openChannelCommand,
batchOpenChannelCommand,
cancelPsbtCommand,
closeChannelCommand,
closeAllChannelsCommand,
abandonChannelCommand,
Expand Down
61 changes: 49 additions & 12 deletions funding/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"time"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/psbt"
Expand All @@ -20,6 +21,14 @@ import (
"golang.org/x/sync/errgroup"
)

const (
// DefaultBatchFundingTimeout is the default timeout for a batch channel
// funding operation. If the whole operation takes longer than this
// timeout, it will be aborted. During the timeout period the user can
// send Ctrl+C to the process to abort the funding process manually.
DefaultBatchFundingTimeout = 60 * time.Second
)

var (
// errShuttingDown is the error that is returned if a signal on the
// quit channel is received which means the whole server is shutting
Expand Down Expand Up @@ -293,7 +302,13 @@ func (b *Batcher) BatchFund(ctx context.Context,
// channel funding negotiation with the peers. Because we specified a
// PSBT assembler, we'll get a special response in the channel once the
// funding output script is known (which we need to craft the TX).
eg := &errgroup.Group{}
// In case one or more of the peers fail to respond within a timeout
// period, we'll abort the whole process. During the timeout period, the
// user can abort the funding process manually by sending Ctrl+C to the
// process which called the rpc command.

// Kick off the channel funding negotiation with each of the peers.
eg, egCtx := errgroup.WithContext(ctx)
for _, channel := range b.channels {
channel.updateChan, channel.errChan = b.cfg.ChannelOpener(
channel.fundingReq,
Expand All @@ -303,7 +318,7 @@ func (b *Batcher) BatchFund(ctx context.Context,
// either the update or error chan.
channel := channel
eg.Go(func() error {
return b.waitForUpdate(channel, true)
return b.waitForUpdate(egCtx, channel, true)
})
}

Expand Down Expand Up @@ -411,13 +426,13 @@ func (b *Batcher) BatchFund(ctx context.Context,
// Now every channel should be ready for the funding transaction to be
// broadcast. Let's wait for the updates that actually confirm this
// state.
eg = &errgroup.Group{}
eg, egCtx = errgroup.WithContext(ctx)
for _, channel := range b.channels {
// Launch another goroutine that waits for the channel pending
// response on the update chan.
channel := channel
eg.Go(func() error {
return b.waitForUpdate(channel, false)
return b.waitForUpdate(egCtx, channel, false)
})
}

Expand Down Expand Up @@ -447,17 +462,27 @@ func (b *Batcher) BatchFund(ctx context.Context,
}

// waitForUpdate waits for an incoming channel update (or error) for a single
// channel.
// channel. If no updates are received within the DefaultBatchFundingTimeout
// period, the method is aborted and an error is returned.
//
// NOTE: Must be called in a goroutine as this blocks until an update or error
// is received.
func (b *Batcher) waitForUpdate(channel *batchChannel, firstUpdate bool) error {
func (b *Batcher) waitForUpdate(ctx context.Context, channel *batchChannel,
firstUpdate bool) error {

Comment thread
hieblmi marked this conversation as resolved.
Outdated
// Create the timeout context.
deadlineCtx, cancel := context.WithTimeout(
ctx, DefaultBatchFundingTimeout,
)
defer cancel()

targetPubKey := channel.fundingReq.TargetPubkey.SerializeCompressed()

select {
// If an error occurs then immediately return the error to the client.
case err := <-channel.errChan:
log.Errorf("unable to open channel to NodeKey(%x): %v",
channel.fundingReq.TargetPubkey.SerializeCompressed(),
err)
targetPubKey, err)
return err

// Otherwise, wait for the next channel update. The first update sent
Expand All @@ -478,6 +503,19 @@ func (b *Batcher) waitForUpdate(channel *batchChannel, firstUpdate bool) error {

case <-b.cfg.Quit:
return errShuttingDown

case <-deadlineCtx.Done():
// If we haven't received any updates within the timeout period,
// we'll return an error indicating the peer that failed to
// respond in time.
log.Infof("Cancelled batch channel opening for peer %x due to "+
"timeout. The peer didn't provide any funding updates"+
" within the timeout period.", targetPubKey)

return fmt.Errorf("cancelled batch channel opening for peer "+
"%x due to timeout. The peer didn't provide any "+
"funding updates within the timeout period, %w",
targetPubKey, deadlineCtx.Err())
}
}

Expand Down Expand Up @@ -534,10 +572,9 @@ func (b *Batcher) cleanup(ctx context.Context) {
// And finally clean up the funding shim for each channel that didn't
// make it into a pending state.
for _, channel := range b.channels {
if channel.isPending {
continue
}

// We need to cancel all funding intents that we created even
// though the channel might not be in the pending state. The
// peer might still be waiting for us to send an error msg.
err := b.cfg.Wallet.CancelFundingIntent(channel.pendingChanID)
if err != nil {
log.Debugf(errMsgTpl, "cancel funding shim", err)
Expand Down
33 changes: 31 additions & 2 deletions funding/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"sync"
"testing"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -60,6 +61,10 @@ type testHarness struct {
pendingTx *wire.MsgTx

txPublished bool

// mu protects the harness state because we do the funding process
// concurrently.
mu sync.RWMutex
}

func newTestHarness(t *testing.T, failUpdate1, failUpdate2,
Expand Down Expand Up @@ -125,6 +130,9 @@ func (h *testHarness) parseRequest(
func (h *testHarness) openChannel(
req *InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {

h.mu.Lock()
defer h.mu.Unlock()

updateChan := make(chan *lnrpc.OpenStatusUpdate, 2)
errChan := make(chan error, 1)

Expand Down Expand Up @@ -164,6 +172,9 @@ func (h *testHarness) openChannel(
}

func (h *testHarness) abandonChannel(op *wire.OutPoint) error {
h.mu.Lock()
defer h.mu.Unlock()

h.abandonedChannels[*op] = struct{}{}

return nil
Expand All @@ -172,6 +183,9 @@ func (h *testHarness) abandonChannel(op *wire.OutPoint) error {
func (h *testHarness) FundPsbt(context.Context,
*walletrpc.FundPsbtRequest) (*walletrpc.FundPsbtResponse, error) {

h.mu.Lock()
defer h.mu.Unlock()

packet, err := psbt.NewFromUnsignedTx(h.pendingTx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -199,6 +213,9 @@ func (h *testHarness) FinalizePsbt(context.Context,
*walletrpc.FinalizePsbtRequest) (*walletrpc.FinalizePsbtResponse,
error) {

h.mu.Lock()
defer h.mu.Unlock()

var psbtBuf bytes.Buffer
if err := h.pendingPacket.Serialize(&psbtBuf); err != nil {
return nil, err
Expand All @@ -219,6 +236,9 @@ func (h *testHarness) ReleaseOutput(_ context.Context,
r *walletrpc.ReleaseOutputRequest) (*walletrpc.ReleaseOutputResponse,
error) {

h.mu.Lock()
defer h.mu.Unlock()

hash, err := chainhash.NewHash(r.Outpoint.TxidBytes)
if err != nil {
return nil, err
Expand All @@ -240,6 +260,9 @@ func (h *testHarness) PsbtFundingVerify([32]byte, *psbt.Packet, bool) error {
func (h *testHarness) PsbtFundingFinalize(pid [32]byte, _ *psbt.Packet,
_ *wire.MsgTx) error {

h.mu.Lock()
defer h.mu.Unlock()

// During the finalize phase we can now prepare the next update to send.
// For this we first need to find the intent that has the channels we
// need to send on.
Expand Down Expand Up @@ -271,6 +294,9 @@ func (h *testHarness) PsbtFundingFinalize(pid [32]byte, _ *psbt.Packet,
}

func (h *testHarness) PublishTransaction(*wire.MsgTx, string) error {
h.mu.Lock()
defer h.mu.Unlock()

if h.failPublish {
return errFundingFailed
}
Expand All @@ -281,6 +307,9 @@ func (h *testHarness) PublishTransaction(*wire.MsgTx, string) error {
}

func (h *testHarness) CancelFundingIntent(pid [32]byte) error {
h.mu.Lock()
defer h.mu.Unlock()

h.intentsCanceled[pid] = struct{}{}

return nil
Expand Down Expand Up @@ -388,7 +417,7 @@ func TestBatchFund(t *testing.T) {
// If we fail on update 2 we do so on the second
// channel so one will be pending and one not
// yet.
require.Len(t, h.intentsCanceled, 1)
require.Len(t, h.intentsCanceled, 2)
require.Len(t, h.abandonedChannels, 1)
require.Contains(
t, h.abandonedChannels, wire.OutPoint{
Expand All @@ -402,7 +431,7 @@ func TestBatchFund(t *testing.T) {
require.Len(t, h.releasedUTXOs, 1)
require.Len(t, h.intentsCreated, 2)

require.Len(t, h.intentsCanceled, 0)
require.Len(t, h.intentsCanceled, 2)
require.Len(t, h.abandonedChannels, 2)
require.Contains(
t, h.abandonedChannels, wire.OutPoint{
Expand Down
4 changes: 2 additions & 2 deletions funding/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,7 +2268,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
peerKey := resCtx.peer.IdentityKey()
failFlow := func(errMsg string, cause error) {
log.Errorf("Unable to handle funding accept message "+
"for peer_key=%x, pending_chan_id=%x: %s: %v",
"for peer_key=%x, pending_chan_id=%v: %s: %v",
peerKey.SerializeCompressed(), cid.tempChanID, errMsg,
cause)
f.failFundingFlow(resCtx.peer, cid, cause)
Expand Down Expand Up @@ -2344,7 +2344,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
// survive a restart as it's in memory only.
case <-f.quit:
log.Errorf("Unable to handle funding accept message "+
"for peer_key=%x, pending_chan_id=%x: funding manager "+
"for peer_key=%x, pending_chan_id=%v: funding manager "+
"shutting down", peerKey.SerializeCompressed(),
cid.tempChanID)
return
Expand Down