-
Notifications
You must be signed in to change notification settings - Fork 2.3k
sweeper: avoid deadlock on shutdown #4851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -154,6 +154,7 @@ type inputCluster struct { | |
| // attempting to sweep. | ||
| type pendingSweepsReq struct { | ||
| respChan chan map[wire.OutPoint]*PendingInput | ||
| errChan chan error | ||
| } | ||
|
|
||
| // PendingInput contains information about an input that is currently being | ||
|
|
@@ -381,6 +382,33 @@ func (s *UtxoSweeper) Start() error { | |
| defer s.wg.Done() | ||
|
|
||
| s.collector(blockEpochs.Epochs) | ||
|
|
||
| // The collector exited and won't longer handle incoming | ||
| // requests. This can happen on shutdown, when the block | ||
| // notifier shuts down before the sweeper and its clients. In | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which case does the block notifier shut down? Aren't sweeper clients listen to the same quit channel?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When lnd is signalled to shut down, the notifier will be stopped. This means the the This all happens before the sweeper's In the case I saw the |
||
| // order to not deadlock the clients waiting for their requests | ||
| // being handled, we handle them here and immediately return an | ||
| // error. When the sweeper finally is shut down we can exit as | ||
| // the clients will be notified. | ||
| for { | ||
| select { | ||
| case inp := <-s.newInputs: | ||
| inp.resultChan <- Result{ | ||
| Err: ErrSweeperShuttingDown, | ||
| } | ||
|
|
||
| case req := <-s.pendingSweepsReqs: | ||
|
halseth marked this conversation as resolved.
Outdated
|
||
| req.errChan <- ErrSweeperShuttingDown | ||
|
|
||
| case req := <-s.updateReqs: | ||
| req.responseChan <- &updateResp{ | ||
| err: ErrSweeperShuttingDown, | ||
| } | ||
|
|
||
| case <-s.quit: | ||
|
carlaKC marked this conversation as resolved.
Outdated
|
||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return nil | ||
|
|
@@ -1290,9 +1318,11 @@ func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint, | |
| // attempting to sweep. | ||
| func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) { | ||
| respChan := make(chan map[wire.OutPoint]*PendingInput, 1) | ||
| errChan := make(chan error, 1) | ||
| select { | ||
| case s.pendingSweepsReqs <- &pendingSweepsReq{ | ||
| respChan: respChan, | ||
| errChan: errChan, | ||
| }: | ||
| case <-s.quit: | ||
| return nil, ErrSweeperShuttingDown | ||
|
|
@@ -1301,6 +1331,8 @@ func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) { | |
| select { | ||
| case pendingSweeps := <-respChan: | ||
| return pendingSweeps, nil | ||
| case err := <-errChan: | ||
| return nil, err | ||
| case <-s.quit: | ||
| return nil, ErrSweeperShuttingDown | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2218,3 +2218,44 @@ func TestRequiredTxOuts(t *testing.T) { | |
| }) | ||
| } | ||
| } | ||
|
|
||
| // TestSweeperShutdownHandling tests that we notify callers when the sweeper | ||
| // cannot handle requests since it's in the process of shutting down. | ||
| func TestSweeperShutdownHandling(t *testing.T) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. very nice! |
||
| ctx := createSweeperTestContext(t) | ||
|
|
||
| // Make the backing notifier break down. This is what happens during | ||
| // lnd shut down, since the notifier is stopped before the sweeper. | ||
| require.Len(t, ctx.notifier.epochChan, 1) | ||
| for epochChan := range ctx.notifier.epochChan { | ||
| close(epochChan) | ||
| } | ||
|
|
||
| // Give the collector some time to exit. | ||
| time.Sleep(50 * time.Millisecond) | ||
|
|
||
| // Now trying to sweep inputs should return an error on the error | ||
| // channel. | ||
| resultChan, err := ctx.sweeper.SweepInput( | ||
| spendableInputs[0], defaultFeePref, | ||
| ) | ||
| require.NoError(t, err) | ||
|
|
||
| select { | ||
| case res := <-resultChan: | ||
| require.Equal(t, ErrSweeperShuttingDown, res.Err) | ||
|
|
||
| case <-time.After(defaultTestTimeout): | ||
| t.Fatalf("no result arrived") | ||
| } | ||
|
|
||
| // Stop the sweeper properly. | ||
| err = ctx.sweeper.Stop() | ||
| require.NoError(t, err) | ||
|
|
||
| // Now attempting to sweep an input should error out immediately. | ||
| _, err = ctx.sweeper.SweepInput( | ||
| spendableInputs[0], defaultFeePref, | ||
| ) | ||
| require.Error(t, err) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.