Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 92 additions & 93 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ func (l *channelLink) resolveFwdPkgs() error {

// If any of our reprocessing steps require an update to the commitment
// txn, we initiate a state transition to capture all relevant changes.
if l.channel.PendingLocalUpdateCount() > 0 {
if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
return l.updateCommitTx()
}

Expand Down Expand Up @@ -1086,6 +1086,83 @@ func (l *channelLink) loadAndRemove() error {
return l.channel.RemoveFwdPkgs(removeHeights...)
}

// handleChanSyncErr performs the error handling logic in the case where we
// could not successfully syncChanStates with our channel peer.
func (l *channelLink) handleChanSyncErr(err error) {
l.log.Warnf("error when syncing channel states: %v", err)

var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss

switch {
case errors.Is(err, ErrLinkShuttingDown):
l.log.Debugf("unable to sync channel states, link is " +
"shutting down")
return

// We failed syncing the commit chains, probably because the remote has
// lost state. We should force close the channel.
case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
fallthrough

// The remote sent us an invalid last commit secret, we should force
// close the channel.
// TODO(halseth): and permanently ban the peer?
case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
fallthrough

// The remote sent us a commit point different from what they sent us
// before.
// TODO(halseth): ban peer?
case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
// We'll fail the link and tell the peer to force close the
// channel. Note that the database state is not updated here,
// but will be updated when the close transaction is ready to
// avoid that we go down before storing the transaction in the
// db.
l.failf(
LinkFailureError{
code: ErrSyncError,
FailureAction: LinkFailureForceClose,
},
"unable to synchronize channel states: %v", err,
)

// We have lost state and cannot safely force close the channel. Fail
// the channel and wait for the remote to hopefully force close it. The
// remote has sent us its latest unrevoked commitment point, and we'll
// store it in the database, such that we can attempt to recover the
// funds if the remote force closes the channel.
case errors.As(err, &errDataLoss):
err := l.channel.MarkDataLoss(
errDataLoss.CommitPoint,
)
if err != nil {
l.log.Errorf("unable to mark channel data loss: %v",
err)
}

// We determined the commit chains were not possible to sync. We
// cautiously fail the channel, but don't force close.
// TODO(halseth): can we safely force close in any cases where this
// error is returned?
case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
if err := l.channel.MarkBorked(); err != nil {
l.log.Errorf("unable to mark channel borked: %v", err)
}

// Other, unspecified error.
default:
}

l.failf(
LinkFailureError{
code: ErrRecoveryError,
FailureAction: LinkFailureForceNone,
},
"unable to synchronize channel states: %v", err,
)
}

// htlcManager is the primary goroutine which drives a channel's commitment
// update state-machine in response to messages received via several channels.
// This goroutine reads messages from the upstream (remote) peer, and also from
Expand Down Expand Up @@ -1121,89 +1198,7 @@ func (l *channelLink) htlcManager() {
if l.cfg.SyncStates {
err := l.syncChanStates()
if err != nil {
l.log.Warnf("error when syncing channel states: %v", err)

errDataLoss, localDataLoss :=
err.(*lnwallet.ErrCommitSyncLocalDataLoss)

switch {
case err == ErrLinkShuttingDown:
l.log.Debugf("unable to sync channel states, " +
"link is shutting down")
return

// We failed syncing the commit chains, probably
// because the remote has lost state. We should force
// close the channel.
case err == lnwallet.ErrCommitSyncRemoteDataLoss:
fallthrough

// The remote sent us an invalid last commit secret, we
// should force close the channel.
// TODO(halseth): and permanently ban the peer?
case err == lnwallet.ErrInvalidLastCommitSecret:
fallthrough

// The remote sent us a commit point different from
// what they sent us before.
// TODO(halseth): ban peer?
case err == lnwallet.ErrInvalidLocalUnrevokedCommitPoint:
// We'll fail the link and tell the peer to
// force close the channel. Note that the
// database state is not updated here, but will
// be updated when the close transaction is
// ready to avoid that we go down before
// storing the transaction in the db.
l.failf(
//nolint:lll
LinkFailureError{
code: ErrSyncError,
FailureAction: LinkFailureForceClose,
},
"unable to synchronize channel "+
"states: %v", err,
)
return

// We have lost state and cannot safely force close the
// channel. Fail the channel and wait for the remote to
// hopefully force close it. The remote has sent us its
// latest unrevoked commitment point, and we'll store
// it in the database, such that we can attempt to
// recover the funds if the remote force closes the
// channel.
case localDataLoss:
err := l.channel.MarkDataLoss(
errDataLoss.CommitPoint,
)
if err != nil {
l.log.Errorf("unable to mark channel "+
"data loss: %v", err)
}

// We determined the commit chains were not possible to
// sync. We cautiously fail the channel, but don't
// force close.
// TODO(halseth): can we safely force close in any
// cases where this error is returned?
case err == lnwallet.ErrCannotSyncCommitChains:
if err := l.channel.MarkBorked(); err != nil {
l.log.Errorf("unable to mark channel "+
"borked: %v", err)
}

// Other, unspecified error.
default:
}

l.failf(
LinkFailureError{
code: ErrRecoveryError,
FailureAction: LinkFailureForceNone,
},
"unable to synchronize channel "+
"states: %v", err,
)
l.handleChanSyncErr(err)
return
}
}
Expand Down Expand Up @@ -1291,15 +1286,19 @@ func (l *channelLink) htlcManager() {
// the batch ticker so that it can be cleared. Otherwise pause
// the ticker to prevent waking up the htlcManager while the
// batch is empty.
if l.channel.PendingLocalUpdateCount() > 0 {
numUpdates := l.channel.NumPendingUpdates(
lntypes.Local, lntypes.Remote,
)
if numUpdates > 0 {
l.cfg.BatchTicker.Resume()
l.log.Tracef("BatchTicker resumed, "+
"PendingLocalUpdateCount=%d",
l.channel.PendingLocalUpdateCount())
"NumPendingUpdates(Local, Remote)=%d",
numUpdates,
)
} else {
l.cfg.BatchTicker.Pause()
l.log.Trace("BatchTicker paused due to zero " +
"PendingLocalUpdateCount")
"NumPendingUpdates(Local, Remote)")
}

select {
Expand Down Expand Up @@ -1657,7 +1656,7 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
l.log.Tracef("received downstream htlc: payment_hash=%x, "+
"local_log_index=%v, pend_updates=%v",
htlc.PaymentHash[:], index,
l.channel.PendingLocalUpdateCount())
l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))

pkt.outgoingChanID = l.ShortChanID()
pkt.outgoingHTLCID = index
Expand Down Expand Up @@ -1863,7 +1862,8 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
// full.
func (l *channelLink) tryBatchUpdateCommitTx() {
if l.channel.PendingLocalUpdateCount() < uint64(l.cfg.BatchSize) {
pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
if pending < uint64(l.cfg.BatchSize) {
return
}

Expand Down Expand Up @@ -1939,7 +1939,6 @@ func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
// direct channel with, updating our respective commitment chains.
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
switch msg := msg.(type) {

case *lnwire.UpdateAddHTLC:
if l.IsFlushing(Incoming) {
// This is forbidden by the protocol specification.
Expand Down Expand Up @@ -2597,9 +2596,9 @@ func (l *channelLink) updateCommitTx() error {
l.cfg.PendingCommitTicker.Resume()
l.log.Trace("PendingCommitTicker resumed")

n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
l.log.Tracef("revocation window exhausted, unable to send: "+
"%v, pend_updates=%v, dangling_closes%v",
l.channel.PendingLocalUpdateCount(),
"%v, pend_updates=%v, dangling_closes%v", n,
lnutils.SpewLogClosure(l.openedCircuits),
lnutils.SpewLogClosure(l.closedCircuits))

Expand Down
Loading