From 543f3694d385336b6a07fe2d1c672c4efdecf94a Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 25 Jun 2024 17:51:54 -0500 Subject: [PATCH 1/2] add failing test case for completer shutdown panic This failing test case exposes the issue in #400 100% of the time, which is caused by the `stopProducers()` call not actually waiting until the producers are fully shut down before proceeding with the remaining shutdown. --- client_test.go | 25 +++++++++++++++++++++++++ internal/jobcompleter/job_completer.go | 1 + 2 files changed, 26 insertions(+) diff --git a/client_test.go b/client_test.go index 392d3c00..b7815f26 100644 --- a/client_test.go +++ b/client_test.go @@ -836,6 +836,31 @@ func Test_Client_Stop(t *testing.T) { }) } +func Test_Client_Stop_AfterContextCancelled(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // doneCh will never close, job will exit due to context cancellation: + doneCh := make(chan struct{}) + startedCh := make(chan int64) + + dbPool := riverinternaltest.TestDB(ctx, t) + client := newTestClient(t, dbPool, newTestConfig(t, makeAwaitCallback(startedCh, doneCh))) + require.NoError(t, client.Start(ctx)) + t.Cleanup(func() { require.NoError(t, client.Stop(context.Background())) }) + + insertRes, err := client.Insert(ctx, callbackArgs{}, nil) + require.NoError(t, err) + startedJobID := riverinternaltest.WaitOrTimeout(t, startedCh) + require.Equal(t, insertRes.Job.ID, startedJobID) + + cancel() + + require.ErrorIs(t, client.Stop(ctx), context.Canceled) +} + func Test_Client_StopAndCancel(t *testing.T) { t.Parallel() diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 5ad7a5a0..44f98967 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -393,6 +393,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { batchID, batchFinalizedAt = mapIDsAndFinalizedAt(setStateBatch) jobRows []*rivertype.JobRow ) + c.Logger.DebugContext(ctx, c.Name+": Completing batch of job(s)", "num_jobs", len(setStateBatch)) if len(setStateBatch) > c.completionMaxSize { jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch)) for i := 0; i < len(setStateBatch); i += c.completionMaxSize { From 9f4d1f1c3b8d5f4c147998b6f6d499a5272c38a0 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 25 Jun 2024 20:54:58 -0500 Subject: [PATCH 2/2] fix shutdown panics by separating completer context Back in #258 / 702d5b2, the batch completer was added to improve throughput. As part of that refactor, it was turned into a startstop service that took a context on start. We took the care to ensure that the context provided to the completer was _not_ the `fetchCtx` (cancelled on `Stop()`) but instead was the raw user-provided `ctx`, specifically to make sure the completer could finish its work even after fetches were stopped. This worked well if the whole shutdown process was done with `Stop` / `StopAndCancel`, but it did not work if the user-provided context was itself cancelled outside of River. In that scenario, the completer would immediately begin shutting down upon cancellation, even without waiting for producers to finish sending it any final jobs that needed to be recorded. This went unnoticed until #379 / 0e57338 turned this scenario into a panic instead of a silent misbehavior, which is what was encountered in #400. To fix this situation, we need to use Go 1.21's new `context.WithoutCancel` API to fork the user-provided context so that we maintain whatever else is stored in there (i.e. so anything used by slog is still available) but we do not cancel this completer's context _ever_. The completer will manage its own shutdown when its `Stop()` is called as part of all of the other client services being stopped in parallel. --- CHANGELOG.md | 4 ++++ client.go | 19 ++++++++++--------- producer.go | 6 ++++++ 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b860912c..dfb4e414 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix possible Client shutdown panics if the user-provided context is cancelled while jobs are still running. [PR #401](https://github.com/riverqueue/river/pull/401). + ## [0.7.0] - 2024-06-13 ### Added diff --git a/client.go b/client.go index d1d897ee..b9cdfd8d 100644 --- a/client.go +++ b/client.go @@ -305,7 +305,6 @@ type Client[TTx any] struct { baseStartStop startstop.BaseStartStop completer jobcompleter.JobCompleter - completerSubscribeCh chan []jobcompleter.CompleterJobUpdated config *Config driver riverdriver.Driver[TTx] elector *leadership.Elector @@ -670,9 +669,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // Each time we start, we need a fresh completer subscribe channel to // send job completion events on, because the completer will close it // each time it shuts down. - c.completerSubscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 10) - c.completer.ResetSubscribeChan(c.completerSubscribeCh) - c.subscriptionManager.ResetSubscribeChan(c.completerSubscribeCh) + completerSubscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 10) + c.completer.ResetSubscribeChan(completerSubscribeCh) + c.subscriptionManager.ResetSubscribeChan(completerSubscribeCh) // In case of error, stop any services that might have started. This // is safe because even services that were never started will still @@ -694,11 +693,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // The completer is part of the services list below, but although it can // stop gracefully along with all the other services, it needs to be - // started with a context that's _not_ fetchCtx. This ensures that even - // when fetch is cancelled on shutdown, the completer is still given a - // separate opportunity to start stopping only after the producers have - // finished up and returned. - if err := c.completer.Start(ctx); err != nil { + // started with a context that's _not_ cancelled if the user-provided + // context is cancelled. This ensures that even when fetch is cancelled on + // shutdown, the completer is still given a separate opportunity to start + // stopping only after the producers have finished up and returned. + if err := c.completer.Start(context.WithoutCancel(ctx)); err != nil { stopServicesOnError() return err } @@ -744,7 +743,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error { <-fetchCtx.Done() // On stop, have the producers stop fetching first of all. + c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers") stopProducers() + c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": All producers stopped") // Stop all mainline services where stop order isn't important. startstop.StopAllParallel(append( diff --git a/producer.go b/producer.go index 27928aef..0ac6cd7d 100644 --- a/producer.go +++ b/producer.go @@ -215,6 +215,12 @@ func (p *producer) Start(ctx context.Context) error { return p.StartWorkContext(ctx, ctx) } +func (p *producer) Stop() { + p.Logger.Debug(p.Name + ": Stopping") + p.BaseStartStop.Stop() + p.Logger.Debug(p.Name + ": Stop returned") +} + // Start starts the producer. It backgrounds a goroutine which is stopped when // context is cancelled or Stop is invoked. //