Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions watchtower/wtclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func New(config *Config) (*TowerClient, error) {
c := &TowerClient{
cfg: cfg,
log: plog,
pipeline: newTaskPipeline(),
pipeline: newTaskPipeline(plog),
Comment thread
cfromknecht marked this conversation as resolved.
Outdated
candidateTowers: newTowerListIterator(candidateTowers...),
candidateSessions: candidateSessions,
activeSessions: make(sessionQueueSet),
Expand All @@ -339,6 +339,7 @@ func New(config *Config) (*TowerClient, error) {
Candidates: c.candidateTowers,
MinBackoff: cfg.MinBackoff,
MaxBackoff: cfg.MaxBackoff,
Log: plog,
})

// Reconstruct the highest commit height processed for each channel
Expand Down Expand Up @@ -468,7 +469,7 @@ func (c *TowerClient) Start() error {
c.wg.Add(1)
go c.backupDispatcher()

log.Infof("Watchtower client started successfully")
c.log.Infof("Watchtower client started successfully")
})
return err
}
Expand Down Expand Up @@ -1006,6 +1007,7 @@ func (c *TowerClient) newSessionQueue(s *wtdb.ClientSession) *sessionQueue {
DB: c.cfg.DB,
MinBackoff: c.cfg.MinBackoff,
MaxBackoff: c.cfg.MaxBackoff,
Log: c.log,
})
}

Expand Down
29 changes: 18 additions & 11 deletions watchtower/wtclient/session_negotiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower/blob"
Expand Down Expand Up @@ -85,6 +86,10 @@ type NegotiatorConfig struct {
// exponential backoff produces a timeout greater than this value, the
// backoff duration will be clamped to MaxBackoff.
MaxBackoff time.Duration

// Log specifies the desired log output, which should be prefixed by the
// client type, e.g. anchor or legacy.
Log btclog.Logger
}

// sessionNegotiator is concrete SessionNegotiator that is able to request new
Expand All @@ -97,6 +102,7 @@ type sessionNegotiator struct {
localInit *wtwire.Init

cfg *NegotiatorConfig
log btclog.Logger

dispatcher chan struct{}
newSessions chan *wtdb.ClientSession
Expand Down Expand Up @@ -130,6 +136,7 @@ func newSessionNegotiator(cfg *NegotiatorConfig) *sessionNegotiator {

return &sessionNegotiator{
cfg: cfg,
log: cfg.Log,
localInit: localInit,
dispatcher: make(chan struct{}, 1),
newSessions: make(chan *wtdb.ClientSession),
Expand All @@ -141,7 +148,7 @@ func newSessionNegotiator(cfg *NegotiatorConfig) *sessionNegotiator {
// Start safely starts up the sessionNegotiator.
func (n *sessionNegotiator) Start() error {
n.started.Do(func() {
log.Debugf("Starting session negotiator")
n.log.Debugf("Starting session negotiator")

n.wg.Add(1)
go n.negotiationDispatcher()
Expand All @@ -153,7 +160,7 @@ func (n *sessionNegotiator) Start() error {
// Stop safely shutsdown the sessionNegotiator.
func (n *sessionNegotiator) Stop() error {
n.stopped.Do(func() {
log.Debugf("Stopping session negotiator")
n.log.Debugf("Stopping session negotiator")

close(n.quit)
n.wg.Wait()
Expand Down Expand Up @@ -191,15 +198,15 @@ func (n *sessionNegotiator) negotiationDispatcher() {
pendingNegotiations++

if pendingNegotiations > 1 {
log.Debugf("Already negotiating session, " +
n.log.Debugf("Already negotiating session, " +
"waiting for existing negotiation to " +
"complete")
continue
}

// TODO(conner): consider reusing good towers

log.Debugf("Dispatching session negotiation")
n.log.Debugf("Dispatching session negotiation")

n.wg.Add(1)
go n.negotiate()
Expand All @@ -213,7 +220,7 @@ func (n *sessionNegotiator) negotiationDispatcher() {
}

if pendingNegotiations > 0 {
log.Debugf("Dispatching pending session " +
n.log.Debugf("Dispatching pending session " +
"negotiation")

n.wg.Add(1)
Expand Down Expand Up @@ -278,7 +285,7 @@ retryWithBackoff:
// We've run out of addresses, update our backoff.
updateBackoff()

log.Debugf("Unable to get new tower candidate, "+
n.log.Debugf("Unable to get new tower candidate, "+
"retrying after %v -- reason: %v", backoff, err)

// Only reset the iterator once we've exhausted all
Expand All @@ -292,7 +299,7 @@ retryWithBackoff:
}

towerPub := tower.IdentityKey.SerializeCompressed()
log.Debugf("Attempting session negotiation with tower=%x",
n.log.Debugf("Attempting session negotiation with tower=%x",
towerPub)

// Before proceeding, we will reserve a session key index to use
Expand All @@ -302,7 +309,7 @@ retryWithBackoff:
tower.ID, n.cfg.Policy.BlobType,
)
if err != nil {
log.Debugf("Unable to reserve session key index "+
n.log.Debugf("Unable to reserve session key index "+
"for tower=%x: %v", towerPub, err)
continue
}
Expand All @@ -314,7 +321,7 @@ retryWithBackoff:
// An unexpected error occurred, updpate our backoff.
updateBackoff()

log.Debugf("Session negotiation with tower=%x "+
n.log.Debugf("Session negotiation with tower=%x "+
"failed, trying again -- reason: %v",
tower.IdentityKey.SerializeCompressed(), err)

Expand Down Expand Up @@ -360,7 +367,7 @@ func (n *sessionNegotiator) createSession(tower *wtdb.Tower,
fallthrough

case err != nil:
log.Debugf("Request for session negotiation with "+
n.log.Debugf("Request for session negotiation with "+
"tower=%s failed, trying again -- reason: "+
"%v", lnAddr, err)
continue
Expand Down Expand Up @@ -467,7 +474,7 @@ func (n *sessionNegotiator) tryAddress(sessionKey keychain.SingleKeyECDH,
err)
}

log.Debugf("New session negotiated with %s, policy: %s",
n.log.Debugf("New session negotiated with %s, policy: %s",
lnAddr, clientSession.Policy)

// We have a newly negotiated session, return it to the
Expand Down
39 changes: 23 additions & 16 deletions watchtower/wtclient/session_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire"
Expand Down Expand Up @@ -70,6 +71,10 @@ type sessionQueueConfig struct {
// timeout greater than this value, the backoff duration will be clamped
// to MaxBackoff.
MaxBackoff time.Duration

// Log specifies the desired log output, which should be prefixed by the
// client type, e.g. anchor or legacy.
Log btclog.Logger
}

// sessionQueue implements a reliable queue that will encrypt and send accepted
Expand All @@ -84,6 +89,7 @@ type sessionQueue struct {
forced sync.Once

cfg *sessionQueueConfig
log btclog.Logger

commitQueue *list.List
pendingQueue *list.List
Expand Down Expand Up @@ -116,6 +122,7 @@ func newSessionQueue(cfg *sessionQueueConfig) *sessionQueue {

sq := &sessionQueue{
cfg: cfg,
log: cfg.Log,
commitQueue: list.New(),
pendingQueue: list.New(),
localInit: localInit,
Expand Down Expand Up @@ -149,7 +156,7 @@ func (q *sessionQueue) Start() {
// will clear all pending tasks in the queue before returning to the caller.
func (q *sessionQueue) Stop() {
q.stopped.Do(func() {
log.Debugf("SessionQueue(%s) stopping ...", q.ID())
q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())

close(q.quit)
q.signalUntilShutdown()
Expand All @@ -161,20 +168,20 @@ func (q *sessionQueue) Stop() {
default:
}

log.Debugf("SessionQueue(%s) stopped", q.ID())
q.log.Debugf("SessionQueue(%s) stopped", q.ID())
})
}

// ForceQuit idempotently aborts any clean shutdown in progress and returns to
// he caller after all lingering goroutines have spun down.
func (q *sessionQueue) ForceQuit() {
q.forced.Do(func() {
log.Infof("SessionQueue(%s) force quitting...", q.ID())
q.log.Infof("SessionQueue(%s) force quitting...", q.ID())

close(q.forceQuit)
q.signalUntilShutdown()

log.Infof("SessionQueue(%s) force quit", q.ID())
q.log.Infof("SessionQueue(%s) force quit", q.ID())
})
}

Expand All @@ -192,7 +199,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {

numPending := uint32(q.pendingQueue.Len())
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
"pending=%d max-updates=%d",
q.ID(), task.id, q.seqNum, numPending, maxUpdates)

Expand All @@ -218,7 +225,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody)
if err != nil {
q.queueCond.L.Unlock()
log.Debugf("SessionQueue(%s) rejected %v: %v ",
q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
q.ID(), task.id, err)
return curStatus, false
}
Expand Down Expand Up @@ -287,7 +294,7 @@ func (q *sessionQueue) drainBackups() {
// First, check that we are able to dial this session's tower.
conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionKeyECDH, q.towerAddr)
if err != nil {
log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v",
q.log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v",
q.ID(), q.towerAddr, err)

q.increaseBackoff()
Expand All @@ -309,7 +316,7 @@ func (q *sessionQueue) drainBackups() {
// before attempting to dequeue any pending updates.
stateUpdate, isPending, backupID, err := q.nextStateUpdate()
if err != nil {
log.Errorf("SessionQueue(%v) unable to get next state "+
q.log.Errorf("SessionQueue(%v) unable to get next state "+
"update: %v", q.ID(), err)
return
}
Expand All @@ -319,7 +326,7 @@ func (q *sessionQueue) drainBackups() {
conn, stateUpdate, q.localInit, sendInit, isPending,
)
if err != nil {
log.Errorf("SessionQueue(%s) unable to send state "+
q.log.Errorf("SessionQueue(%s) unable to send state "+
"update: %v", q.ID(), err)

q.increaseBackoff()
Expand All @@ -330,7 +337,7 @@ func (q *sessionQueue) drainBackups() {
return
}

log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
q.ID(), backupID, stateUpdate.SeqNum)

// If the last task was backed up successfully, we'll exit and
Expand Down Expand Up @@ -388,7 +395,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
q.queueCond.L.Unlock()

log.Debugf("SessionQueue(%s) reprocessing committed state "+
q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
"update for %v seqnum=%d",
q.ID(), update.BackupID, seqNum)

Expand Down Expand Up @@ -429,7 +436,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
},
}

log.Debugf("SessionQueue(%s) committing state update "+
q.log.Debugf("SessionQueue(%s) committing state update "+
"%v seqnum=%d", q.ID(), update.BackupID, seqNum)
}

Expand Down Expand Up @@ -538,7 +545,7 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
err := fmt.Errorf("received error code %v in "+
"StateUpdateReply for seqnum=%d",
stateUpdateReply.Code, stateUpdate.SeqNum)
log.Warnf("SessionQueue(%s) unable to upload state update to "+
q.log.Warnf("SessionQueue(%s) unable to upload state update to "+
"tower=%s: %v", q.ID(), q.towerAddr, err)
return err
}
Expand All @@ -550,21 +557,21 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
// TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err)
q.log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err)
return err

case err == wtdb.ErrLastAppliedReversion:
// TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err)
return err

case err != nil:
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err)
return err
}
Expand Down
Loading