Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions chainntnfs/txnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1757,10 +1757,6 @@ func (n *TxNotifier) NotifyHeight(height uint32) error {
for ntfn := range n.ntfnsByConfirmHeight[height] {
confSet := n.confNotifications[ntfn.ConfRequest]

Log.Debugf("Dispatching %v confirmation notification for "+
"conf_id=%v, %v", ntfn.NumConfirmations, ntfn.ConfID,
ntfn.ConfRequest)

// The default notification we assigned above includes the
// block along with the rest of the details. However not all
// clients want the block, so we make a copy here w/o the block
Expand All @@ -1770,6 +1766,20 @@ func (n *TxNotifier) NotifyHeight(height uint32) error {
confDetails.Block = nil
}

// If the `confDetails` has already been sent before, we'll
// skip it and continue processing the next one.
if ntfn.dispatched {
Comment thread
Roasbeef marked this conversation as resolved.
Log.Debugf("Skipped dispatched conf details for "+
"request %v conf_id=%v", ntfn.ConfRequest,
ntfn.ConfID)

continue
}

Log.Debugf("Dispatching %v confirmation notification for "+
"conf_id=%v, %v", ntfn.NumConfirmations, ntfn.ConfID,
ntfn.ConfRequest)

select {
case ntfn.Event.Confirmed <- &confDetails:
ntfn.dispatched = true
Expand Down
13 changes: 12 additions & 1 deletion chanrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
Expand Down Expand Up @@ -286,6 +287,9 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e

ltndLog.Infof("Informing chain watchers of new restored channels")

// Create a slice of channel points.
chanPoints := make([]wire.OutPoint, 0, len(channelShells))

// Finally, we'll need to inform the chain arbitrator of these new
// channels so we'll properly watch for their ultimate closure on chain
// and sweep them via the DLP.
Expand All @@ -294,8 +298,15 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e
if err != nil {
return err
}

chanPoints = append(
chanPoints, restoredChannel.Chan.FundingOutpoint,
)
}

// With all the channels restored, we'll now re-send the blockbeat.
c.chainArb.RedispatchBlockbeat(chanPoints)

Comment thread
ziggie1984 marked this conversation as resolved.
return nil
}

Expand All @@ -314,7 +325,7 @@ func (s *server) ConnectPeer(nodePub *btcec.PublicKey, addrs []net.Addr) error {
// to ensure the new connection is created after this new link/channel
// is known.
if err := s.DisconnectPeer(nodePub); err != nil {
ltndLog.Infof("Peer(%v) is already connected, proceeding "+
ltndLog.Infof("Peer(%x) is already connected, proceeding "+
"with chan restore", nodePub.SerializeCompressed())
}

Expand Down
9 changes: 9 additions & 0 deletions contractcourt/briefcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,15 @@ func (a ArbitratorState) String() string {
}
}

// IsContractClosed returns a bool to indicate whether the closing/breaching tx
// has been confirmed onchain. If the state is StateContractClosed,
// StateWaitingFullResolution, or StateFullyResolved, it means the contract has
// been closed and all related contracts have been launched.
func (a ArbitratorState) IsContractClosed() bool {
return a == StateContractClosed || a == StateWaitingFullResolution ||
a == StateFullyResolved
}

// resolverType is an enum that enumerates the various types of resolvers. When
// writing resolvers to disk, we prepend this to the raw bytes stored. This
// allows us to properly decode the resolver into the proper type.
Expand Down
69 changes: 63 additions & 6 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,6 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
// Set the current beat.
c.beat = beat

log.Infof("ChainArbitrator starting at height %d with budget=[%v]",
&c.cfg.Budget, c.beat.Height())

// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
if err := c.loadOpenChannels(); err != nil {
Expand Down Expand Up @@ -690,6 +687,11 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
c.dispatchBlocks()
}()

log.Infof("ChainArbitrator starting at height %d with %d chain "+
"watchers, %d channel arbitrators, and budget config=[%v]",
c.beat.Height(), len(c.activeWatchers), len(c.activeChannels),
&c.cfg.Budget)

// TODO(roasbeef): eventually move all breach watching here

return nil
Expand Down Expand Up @@ -727,19 +729,34 @@ func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {

// Create a slice to record active channel arbitrator.
channels := make([]chainio.Consumer, 0, len(c.activeChannels))
watchers := make([]chainio.Consumer, 0, len(c.activeWatchers))

// Copy the active channels to the slice.
for _, channel := range c.activeChannels {
channels = append(channels, channel)
}

for _, watcher := range c.activeWatchers {
watchers = append(watchers, watcher)
}

c.Unlock()

// Iterate all the copied watchers and send the blockbeat to them.
err := chainio.DispatchConcurrent(beat, watchers)
if err != nil {
log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
}

// Iterate all the copied channels and send the blockbeat to them.
//
// NOTE: This method will timeout if the processing of blocks of the
// subsystems is too long (60s).
err := chainio.DispatchConcurrent(beat, channels)
err = chainio.DispatchConcurrent(beat, channels)
if err != nil {
log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
err)
}

// Notify the chain arbitrator has processed the block.
c.NotifyBlockProcessed(beat, err)
Expand Down Expand Up @@ -1046,8 +1063,8 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error

chanPoint := newChan.FundingOutpoint

log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
chanPoint)
log.Infof("Creating new chainWatcher and ChannelArbitrator for "+
"ChannelPoint(%v)", chanPoint)

// If we're already watching this channel, then we'll ignore this
// request.
Expand Down Expand Up @@ -1356,3 +1373,43 @@ func (c *ChainArbitrator) loadPendingCloseChannels() error {

return nil
}

// RedispatchBlockbeat resends the current blockbeat to the channels specified
// by the chanPoints. It is used when a channel is added to the chain
// arbitrator after it has been started, e.g., during the channel restore
// process.
func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) {
// Get the current blockbeat.
beat := c.beat

// Prepare two sets of consumers.
channels := make([]chainio.Consumer, 0, len(chanPoints))
watchers := make([]chainio.Consumer, 0, len(chanPoints))

// Read the active channels in a lock.
c.Lock()
for _, op := range chanPoints {
if channel, ok := c.activeChannels[op]; ok {
channels = append(channels, channel)
}

if watcher, ok := c.activeWatchers[op]; ok {
watchers = append(watchers, watcher)
}
}
c.Unlock()

// Iterate all the copied watchers and send the blockbeat to them.
err := chainio.DispatchConcurrent(beat, watchers)
if err != nil {
log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
}

// Iterate all the copied channels and send the blockbeat to them.
err = chainio.DispatchConcurrent(beat, channels)
if err != nil {
// Shutdown lnd if there's an error processing the block.
log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
err)
}
}
Loading