Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions chancloser.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,14 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
"close: %v", c.chanPoint, err)
}

// Before publishing the closing tx, we persist it to the
// database, such that it can be republished if something goes
// wrong.
err = c.cfg.channel.MarkCommitmentBroadcasted(closeTx)
if err != nil {
return nil, false, err
}

// With the closing transaction crafted, we'll now broadcast it
// to the network.
peerLog.Infof("Broadcasting cooperative close tx: %v",
Expand All @@ -444,9 +452,6 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
if err := c.cfg.broadcastTx(closeTx); err != nil {
return nil, false, err
}
if err := c.cfg.channel.MarkCommitmentBroadcasted(); err != nil {
return nil, false, err
}

// Finally, we'll transition to the closeFinished state, and
// also return the final close signed message we sent.
Expand Down
208 changes: 156 additions & 52 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain"
Expand Down Expand Up @@ -58,6 +59,10 @@ var (
// remote peer during a channel sync in case we have lost channel state.
dataLossCommitPointKey = []byte("data-loss-commit-point-key")

// closingTxKey points to a the closing tx that we broadcasted when
// moving the channel to state CommitBroadcasted.
closingTxKey = []byte("closing-tx-key")

// commitDiffKey stores the current pending commitment state we've
// extended to the remote party (if any). Each time we propose a new
// state, we store the information necessary to reconstruct this state
Expand Down Expand Up @@ -103,6 +108,10 @@ var (
// in the database.
ErrNoCommitPoint = fmt.Errorf("no commit point found")

// ErrNoCloseTx is returned when no closing tx is found for a channel
// in the state CommitBroadcasted.
ErrNoCloseTx = fmt.Errorf("no closing tx found")

// ErrNoRestoredChannelMutation is returned when a caller attempts to
// mutate a channel that's been recovered.
ErrNoRestoredChannelMutation = fmt.Errorf("cannot mutate restored " +
Expand Down Expand Up @@ -514,16 +523,6 @@ type OpenChannel struct {
sync.RWMutex
}

// FullSync serializes, and writes to disk the *full* channel state, using
// both the active channel bucket to store the prefixed column fields, and the
// remote node's ID to store the remainder of the channel state.
func (c *OpenChannel) FullSync() error {
c.Lock()
defer c.Unlock()

return c.Db.Update(c.fullSync)
}

// ShortChanID returns the current ShortChannelID of this channel.
func (c *OpenChannel) ShortChanID() lnwire.ShortChannelID {
c.RLock()
Expand Down Expand Up @@ -648,9 +647,8 @@ func fetchChanBucket(tx *bbolt.Tx, nodeKey *btcec.PublicKey,
return chanBucket, nil
}

// fullSync is an internal version of the FullSync method which allows callers
// to sync the contents of an OpenChannel while re-using an existing database
// transaction.
// fullSync syncs the contents of an OpenChannel while re-using an existing
// database transaction.
func (c *OpenChannel) fullSync(tx *bbolt.Tx) error {
// First fetch the top level bucket which stores all data related to
// current, active channels.
Expand Down Expand Up @@ -736,44 +734,16 @@ func (c *OpenChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error {
c.Lock()
defer c.Unlock()

var status ChannelStatus
if err := c.Db.Update(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
if err != nil {
return err
}

channel, err := fetchOpenChannel(chanBucket, &c.FundingOutpoint)
if err != nil {
return err
}

// Add status LocalDataLoss to the existing bitvector found in
// the DB.
status = channel.chanStatus | ChanStatusLocalDataLoss
channel.chanStatus = status

var b bytes.Buffer
if err := WriteElement(&b, commitPoint); err != nil {
return err
}

err = chanBucket.Put(dataLossCommitPointKey, b.Bytes())
if err != nil {
return err
}

return putOpenChannel(chanBucket, channel)
}); err != nil {
var b bytes.Buffer
if err := WriteElement(&b, commitPoint); err != nil {
return err
}

// Update the in-memory representation to keep it in sync with the DB.
c.chanStatus = status
putCommitPoint := func(chanBucket *bbolt.Bucket) error {
return chanBucket.Put(dataLossCommitPointKey, b.Bytes())
}

return nil
return c.putChanStatus(ChanStatusLocalDataLoss, putCommitPoint)
}

// DataLossCommitPoint retrieves the stored commit point set during
Expand Down Expand Up @@ -821,6 +791,82 @@ func (c *OpenChannel) MarkBorked() error {
return c.putChanStatus(ChanStatusBorked)
}

// ChanSyncMsg returns the ChannelReestablish message that should be sent upon
// reconnection with the remote peer that we're maintaining this channel with.
// The information contained within this message is necessary to re-sync our
// commitment chains in the case of a last or only partially processed message.
// When the remote party receiver this message one of three things may happen:
//
// 1. We're fully synced and no messages need to be sent.
// 2. We didn't get the last CommitSig message they sent, to they'll re-send
// it.
// 3. We didn't get the last RevokeAndAck message they sent, so they'll
// re-send it.
//
// If this is a restored channel, having status ChanStatusRestored, then we'll
// modify our typical chan sync message to ensure they force close even if
// we're on the very first state.
func (c *OpenChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) {
c.Lock()
defer c.Unlock()

// The remote commitment height that we'll send in the
// ChannelReestablish message is our current commitment height plus
// one. If the receiver thinks that our commitment height is actually
// *equal* to this value, then they'll re-send the last commitment that
// they sent but we never fully processed.
localHeight := c.LocalCommitment.CommitHeight
nextLocalCommitHeight := localHeight + 1

// The second value we'll send is the height of the remote commitment
// from our PoV. If the receiver thinks that their height is actually
// *one plus* this value, then they'll re-send their last revocation.
remoteChainTipHeight := c.RemoteCommitment.CommitHeight

// If this channel has undergone a commitment update, then in order to
// prove to the remote party our knowledge of their prior commitment
// state, we'll also send over the last commitment secret that the
// remote party sent.
var lastCommitSecret [32]byte
if remoteChainTipHeight != 0 {
remoteSecret, err := c.RevocationStore.LookUp(
remoteChainTipHeight - 1,
)
if err != nil {
return nil, err
}
lastCommitSecret = [32]byte(*remoteSecret)
}

// Additionally, we'll send over the current unrevoked commitment on
// our local commitment transaction.
currentCommitSecret, err := c.RevocationProducer.AtIndex(
localHeight,
)
if err != nil {
return nil, err
}

// If we've restored this channel, then we'll purposefully give them an
// invalid LocalUnrevokedCommitPoint so they'll force close the channel
// allowing us to sweep our funds.
if c.hasChanStatus(ChanStatusRestored) {
currentCommitSecret[0] ^= 1
}

return &lnwire.ChannelReestablish{
ChanID: lnwire.NewChanIDFromOutPoint(
&c.FundingOutpoint,
),
NextLocalCommitHeight: nextLocalCommitHeight,
RemoteCommitTailHeight: remoteChainTipHeight,
LastRemoteCommitSecret: lastCommitSecret,
LocalUnrevokedCommitPoint: input.ComputeCommitmentPoint(
currentCommitSecret[:],
),
}, nil
}

// isBorked returns true if the channel has been marked as borked in the
// database. This requires an existing database transaction to already be
// active.
Expand All @@ -837,15 +883,63 @@ func (c *OpenChannel) isBorked(chanBucket *bbolt.Bucket) (bool, error) {

// MarkCommitmentBroadcasted marks the channel as a commitment transaction has
// been broadcast, either our own or the remote, and we should watch the chain
// for it to confirm before taking any further action.
func (c *OpenChannel) MarkCommitmentBroadcasted() error {
// for it to confirm before taking any further action. It takes as argument the
// closing tx _we believe_ will appear in the chain. This is only used to
// republish this tx at startup to ensure propagation, and we should still
// handle the case where a different tx actually hits the chain.
func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx) error {
c.Lock()
defer c.Unlock()

return c.putChanStatus(ChanStatusCommitBroadcasted)
var b bytes.Buffer
if err := WriteElement(&b, closeTx); err != nil {
return err
}

putClosingTx := func(chanBucket *bbolt.Bucket) error {
return chanBucket.Put(closingTxKey, b.Bytes())
}

return c.putChanStatus(ChanStatusCommitBroadcasted, putClosingTx)
}

// BroadcastedCommitment retrieves the stored closing tx set during
// MarkCommitmentBroadcasted. If not found ErrNoCloseTx is returned.
func (c *OpenChannel) BroadcastedCommitment() (*wire.MsgTx, error) {
var closeTx *wire.MsgTx

err := c.Db.View(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
switch err {
case nil:
case ErrNoChanDBExists, ErrNoActiveChannels, ErrChannelNotFound:
return ErrNoCloseTx
default:
return err
}

bs := chanBucket.Get(closingTxKey)
if bs == nil {
return ErrNoCloseTx
}
r := bytes.NewReader(bs)
return ReadElement(r, &closeTx)
})
if err != nil {
return nil, err
}

return closeTx, nil
}

func (c *OpenChannel) putChanStatus(status ChannelStatus) error {
// putChanStatus appends the given status to the channel. fs is an optional
// list of closures that are given the chanBucket in order to atomically add
// extra information together with the new status.
func (c *OpenChannel) putChanStatus(status ChannelStatus,
fs ...func(*bbolt.Bucket) error) error {

if err := c.Db.Update(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
Expand All @@ -863,7 +957,17 @@ func (c *OpenChannel) putChanStatus(status ChannelStatus) error {
status = channel.chanStatus | status
channel.chanStatus = status

return putOpenChannel(chanBucket, channel)
if err := putOpenChannel(chanBucket, channel); err != nil {
return err
}

for _, f := range fs {
if err := f(chanBucket); err != nil {
return err
}
}

return nil
}); err != nil {
return err
}
Expand Down
35 changes: 32 additions & 3 deletions channeldb/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,12 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
OnionBlob: []byte("onionblob"),
},
}
if err := state.FullSync(); err != nil {

addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18556,
}
if err := state.SyncPending(addr, 101); err != nil {
t.Fatalf("unable to save and serialize channel state: %v", err)
}

Expand Down Expand Up @@ -363,7 +368,12 @@ func TestChannelStateTransition(t *testing.T) {
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
if err := channel.FullSync(); err != nil {

addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18556,
}
if err := channel.SyncPending(addr, 101); err != nil {
t.Fatalf("unable to save and serialize channel state: %v", err)
}

Expand Down Expand Up @@ -881,7 +891,13 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
// This would happen in the event of a force close and should make the
// channels enter a state of waiting close.
for _, channel := range channels {
if err := channel.MarkCommitmentBroadcasted(); err != nil {
closeTx := wire.NewMsgTx(2)
closeTx.AddTxIn(
&wire.TxIn{
PreviousOutPoint: channel.FundingOutpoint,
},
)
if err := channel.MarkCommitmentBroadcasted(closeTx); err != nil {
t.Fatalf("unable to mark commitment broadcast: %v", err)
}
}
Expand All @@ -906,6 +922,19 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
t.Fatalf("expected channel %v to be waiting close",
channel.FundingOutpoint)
}

// Finally, make sure we can retrieve the closing tx for the
// channel.
closeTx, err := channel.BroadcastedCommitment()
if err != nil {
t.Fatalf("Unable to retrieve commitment: %v", err)
}

if closeTx.TxIn[0].PreviousOutPoint != channel.FundingOutpoint {
t.Fatalf("expected outpoint %v, got %v",
channel.FundingOutpoint,
closeTx.TxIn[0].PreviousOutPoint)
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion channeldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ func TestFetchClosedChannelForID(t *testing.T) {
for i := uint32(0); i < numChans; i++ {
// Save the open channel to disk.
state.FundingOutpoint.Index = i
if err := state.FullSync(); err != nil {

addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18556,
}
if err := state.SyncPending(addr, 101); err != nil {
t.Fatalf("unable to save and serialize channel "+
"state: %v", err)
}
Expand Down
Loading