Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/funding"
"github.com/lightningnetwork/lnd/htlcswitch"
Expand Down Expand Up @@ -405,6 +406,8 @@ type Config struct {

ChannelCommitBatchSize uint32 `long:"channel-commit-batch-size" description:"The maximum number of channel state updates that is accumulated before signing a new commitment."`

ChannelArbStartTimeout time.Duration `long:"channel-arb-start-timeout" hidden:"true" description:"The maximum time that is allowed to pass while the chain arbitrator starts the individual channel arbitrators. This value might need to be increased if lnd is running with aux components that depend on all subsystems to be fully started."`

KeepFailedPaymentAttempts bool `long:"keep-failed-payment-attempts" description:"Keeps persistent record of all failed payment attempts for successfully settled payments."`

StoreFinalHtlcResolutions bool `long:"store-final-htlc-resolutions" description:"Persistently store the final resolution of incoming htlcs."`
Expand Down Expand Up @@ -720,6 +723,7 @@ func DefaultConfig() Config {
ChannelCommitInterval: defaultChannelCommitInterval,
PendingCommitInterval: defaultPendingCommitInterval,
ChannelCommitBatchSize: defaultChannelCommitBatchSize,
ChannelArbStartTimeout: contractcourt.DefaultChainArbTimeout,
CoinSelectionStrategy: defaultCoinSelectionStrategy,
KeepFailedPaymentAttempts: defaultKeepFailedPaymentAttempts,
RemoteSigner: &lncfg.RemoteSigner{
Expand Down Expand Up @@ -1668,6 +1672,14 @@ func ValidateConfig(cfg Config, interceptor signal.Interceptor, fileParser,
maxPendingCommitInterval)
}

// Limit the chain arbitrator startup timeout to a reasonable minimum
// value. Too short might run into restart loops.
if cfg.ChannelArbStartTimeout < contractcourt.DefaultChainArbTimeout {
return nil, mkErr("channel-arb-start-timeout (%v) must be at "+
"least %v", cfg.ChannelArbStartTimeout,
contractcourt.DefaultChainArbTimeout)
}

if err := cfg.Gossip.Parse(); err != nil {
return nil, mkErr("error parsing gossip syncer: %v", err)
}
Expand Down
76 changes: 72 additions & 4 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package contractcourt

import (
"context"
"errors"
"fmt"
"sync"
Expand All @@ -22,11 +23,21 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/sync/errgroup"
)

// ErrChainArbExiting signals that the chain arbitrator is shutting down.
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")

const (
// DefaultChainArbTimeout is the default timeout for the chain
// arbitrator to start the channel arbitrators for each channel. This
// value must be long enough because starting arbitrators might depend
// on external components (e.g. aux components, hooks), which might
// require other lnd subsystems to be fully started.
DefaultChainArbTimeout = 5 * time.Minute
)

// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
// outgoing contract has been fully resolved. For multi-hop contracts, if we
// resolve the outgoing contract, we'll also need to ensure that the incoming
Expand Down Expand Up @@ -218,6 +229,10 @@ type ChainArbitratorConfig struct {
// lower package.
QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey

// StartupTimeout is the maximum time the arbitrator will wait when
// starting the individual channel arbitrators.
StartupTimeout time.Duration

// AuxLeafStore is an optional store that can be used to store auxiliary
// leaves for certain custom channel types.
AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
Expand Down Expand Up @@ -272,6 +287,12 @@ type ChainArbitrator struct {
func NewChainArbitrator(cfg ChainArbitratorConfig,
db *channeldb.DB) *ChainArbitrator {

// Let's use the default timeout for tests. For normal operation, the
// value is validated when parsing the configuration.
if cfg.StartupTimeout == 0 {
cfg.StartupTimeout = DefaultChainArbTimeout
}

return &ChainArbitrator{
cfg: cfg,
activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
Expand Down Expand Up @@ -767,20 +788,67 @@ func (c *ChainArbitrator) Start() error {

// Launch all the goroutines for each arbitrator so they can carry out
// their duties.
// Set a timeout for the group so we don't wait indefinitely if one
// startup function blocks.
ctx, cancel := context.WithTimeout(
context.Background(), c.cfg.StartupTimeout,
)

eg, egCtx := errgroup.WithContext(ctx)

for _, arbitrator := range c.activeChannels {
startState, ok := startStates[arbitrator.cfg.ChanPoint]
if !ok {
stopAndLog()

// In case we encounter an error we need to cancel the
// context to ensure all goroutines are cleaned up.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't actually clean up the goroutines yet (or I mean we don't interrupt the arb.Start() yet, we just don't wait for it anymore). So can perhaps add a TODO here that we'll want to eventually add a context to Start() so things will clean up properly on cancel?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we clean them up, I think adding a context ist not really the way to go because looking into the startup functionality it does not look like a context might help here.

cancel()
return fmt.Errorf("arbitrator: %v has no start state",
arbitrator.cfg.ChanPoint)
}

if err := arbitrator.Start(startState); err != nil {
stopAndLog()
return err
}
arb := arbitrator // Create new variable for closure
eg.Go(func() error {
// Create a non-blocking goroutine for the actual Start
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might also check the context in the beginning before starting arbitrator.

  if ctx.Err() != nil {
        return ctx.Err()
    }

// call
startErrChan := make(chan error, 1)
go func() {
startErrChan <- arb.Start(startState)
}()

select {
case startErr := <-startErrChan:
if startErr != nil {
cancel()
}

return startErr

case <-egCtx.Done():
return egCtx.Err()

case <-c.quit:

return ErrChainArbExiting
}
})
}

// Wait for the error group to finish and if an error occurred we
// trigger a graceful shutdown of LND.
//
// NOTE: We do not add this collector to the waitGroup because we want
// to stop the chain arbitrator if there occurs an error.
go func() {
defer cancel()

if err := eg.Wait(); err != nil {
log.Criticalf("ChainArbitrator failed to start all "+
"channel arbitrators: %v", err)
}
}()

// Subscribe to a single stream of block epoch notifications that we
// will dispatch to all active arbitrators.
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
Expand Down
20 changes: 13 additions & 7 deletions contractcourt/channel_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ type ChannelArbitrator struct {
// upon start up to decide which actions to take.
state ArbitratorState

wg sync.WaitGroup
wg *fn.GoroutineManager
quit chan struct{}
}

Expand Down Expand Up @@ -413,6 +413,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
activeHTLCs: htlcSets,
unmergedSet: unmerged,
cfg: cfg,
wg: fn.NewGoroutineManager(context.Background()),
quit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -570,9 +571,9 @@ func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
}
}

c.wg.Add(1)
go c.channelAttendant(bestHeight)
return nil
return c.wg.Go(func(ctx context.Context) {
c.channelAttendant(bestHeight)
})
}

// maybeAugmentTaprootResolvers will update the contract resolution information
Expand Down Expand Up @@ -844,7 +845,7 @@ func (c *ChannelArbitrator) Stop() error {
c.activeResolversLock.RUnlock()

close(c.quit)
c.wg.Wait()
c.wg.Stop()

return nil
}
Expand Down Expand Up @@ -1568,8 +1569,13 @@ func (c *ChannelArbitrator) launchResolvers(resolvers []ContractResolver,

c.activeResolvers = resolvers
for _, contract := range resolvers {
c.wg.Add(1)
go c.resolveContract(contract, immediate)
err := c.wg.Go(func(ctx context.Context) {
c.resolveContract(contract, immediate)
})
if err != nil {
log.Criticalf("ChannelArbitrator(%v): unable to "+
"launch resolver: %v", c.cfg.ChanPoint, err)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@

* Make sure the RPC clients used to access the chain backend are [properly
shutdown](https://github.com/lightningnetwork/lnd/pull/9261).

* [Start channel arbitrators concurrently](
https://github.com/lightningnetwork/lnd/pull/9262) to prevent potential
deadlocks when lnd depends on external components (e.g. aux components, hooks)
which might require other lnd subsystems to be fully started.

# New Features
## Functional Enhancements
Expand Down