From 1f236a69e68e8fab631a82dffba62fa9156cfa4a Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 27 Jan 2024 12:11:09 -0800 Subject: [PATCH] Specify specific cancel cause on shutdown and use it to determine error logging Follow up to a discussion [1] in which an additional error log was causing example tests to fail after uses of `log` were replaced with `slog` and no longer suppressed. Here, use `WithCancelCause` to send a specific cancellation error on shutdown, which can be handled specially for instances where we'd only want to log an error under unusual circumstances. [1] https://github.com/riverqueue/river/pull/140#discussion_r1439099623 --- client.go | 14 +++++++------- client_test.go | 10 +++------- internal/notifier/notifier.go | 14 ++------------ internal/rivercommon/constants.go | 10 ---------- internal/rivercommon/river_common.go | 18 ++++++++++++++++++ 5 files changed, 30 insertions(+), 36 deletions(-) delete mode 100644 internal/rivercommon/constants.go create mode 100644 internal/rivercommon/river_common.go diff --git a/client.go b/client.go index 7996016f..90e6a82b 100644 --- a/client.go +++ b/client.go @@ -257,7 +257,7 @@ type Client[TTx any] struct { // fetchNewWorkCancel cancels the context used for fetching new work. This // will be used to stop fetching new work whenever stop is initiated, or // when the context provided to Run is itself cancelled. - fetchNewWorkCancel context.CancelFunc + fetchNewWorkCancel context.CancelCauseFunc id string monitor *clientMonitor @@ -276,7 +276,7 @@ type Client[TTx any] struct { // workCancel cancels the context used for all work goroutines. Normal Stop // does not cancel that context. - workCancel context.CancelFunc + workCancel context.CancelCauseFunc } // Test-only signals. @@ -569,9 +569,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // We use separate contexts for fetching and working to allow for a graceful // stop. However, both inherit from the provided context so if it is // cancelled a more aggressive stop will be initiated. - fetchNewWorkCtx, fetchNewWorkCancel := context.WithCancel(ctx) + fetchNewWorkCtx, fetchNewWorkCancel := context.WithCancelCause(ctx) c.fetchNewWorkCancel = fetchNewWorkCancel - workCtx, workCancel := context.WithCancel(withClient[TTx](ctx, c)) + workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c)) c.workCancel = workCancel // Before doing anything else, make an initial connection to the database to @@ -690,7 +690,7 @@ func (c *Client[TTx]) Stop(ctx context.Context) error { } c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Stop started") - c.fetchNewWorkCancel() + c.fetchNewWorkCancel(rivercommon.ErrShutdown) return c.awaitStop(ctx) } @@ -715,8 +715,8 @@ func (c *Client[TTx]) awaitStop(ctx context.Context) error { // instead. func (c *Client[TTx]) StopAndCancel(ctx context.Context) error { c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work") - c.fetchNewWorkCancel() - c.workCancel() + c.fetchNewWorkCancel(rivercommon.ErrShutdown) + c.workCancel(rivercommon.ErrShutdown) return c.awaitStop(ctx) } diff --git a/client_test.go b/client_test.go index 59c23a19..451e8850 100644 --- a/client_test.go +++ b/client_test.go @@ -427,7 +427,7 @@ func Test_Client(t *testing.T) { require.Equal(t, `relation "river_job" does not exist`, pgErr.Message) }) - t.Run("Stopped", func(t *testing.T) { + t.Run("StopAndCancel", func(t *testing.T) { t.Parallel() client, _ := setup(t) @@ -441,6 +441,7 @@ func Test_Client(t *testing.T) { AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { jobStartedChan <- job.ID <-ctx.Done() + require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown) close(jobDoneChan) return nil })) @@ -459,12 +460,7 @@ func Test_Client(t *testing.T) { default: } - stopCtx, stopCancel := context.WithTimeout(ctx, 5*time.Second) - t.Cleanup(stopCancel) - - if err := client.StopAndCancel(stopCtx); err != nil { - t.Fatal(err) - } + require.NoError(t, client.StopAndCancel(ctx)) riverinternaltest.WaitOrTimeout(t, client.Stopped()) }) diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index 8b842737..a0b9802e 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -14,6 +14,7 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/componentstatus" + "github.com/riverqueue/river/internal/rivercommon" ) const statementTimeout = 5 * time.Second @@ -135,18 +136,7 @@ func (n *Notifier) getConnAndRun(ctx context.Context) { conn, err := n.establishConn(ctx) if err != nil { - if errors.Is(err, context.Canceled) { - return - } - // Log at a lower verbosity level in case an error is received when the - // context is already done (probably because the client is stopping). - // Example tests can finish before the notifier connects and starts - // listening, and on client stop may produce a connection error that - // would otherwise pollute output and fail the test. - select { - case <-ctx.Done(): - n.logger.Info("error establishing connection from pool", "err", err) - default: + if !errors.Is(context.Cause(ctx), rivercommon.ErrShutdown) { n.logger.Error("error establishing connection from pool", "err", err) } return diff --git a/internal/rivercommon/constants.go b/internal/rivercommon/constants.go deleted file mode 100644 index 659fc0b4..00000000 --- a/internal/rivercommon/constants.go +++ /dev/null @@ -1,10 +0,0 @@ -package rivercommon - -// These constants are made available in rivercommon so that they're accessible -// by internal packages, but the top-level river package re-exports them, and -// all user code must use that set instead. -const ( - MaxAttemptsDefault = 25 - PriorityDefault = 1 - QueueDefault = "default" -) diff --git a/internal/rivercommon/river_common.go b/internal/rivercommon/river_common.go new file mode 100644 index 00000000..60914999 --- /dev/null +++ b/internal/rivercommon/river_common.go @@ -0,0 +1,18 @@ +package rivercommon + +import "errors" + +// These constants are made available in rivercommon so that they're accessible +// by internal packages, but the top-level river package re-exports them, and +// all user code must use that set instead. +const ( + MaxAttemptsDefault = 25 + PriorityDefault = 1 + QueueDefault = "default" +) + +// ErrShutdown is a special error injected by the client into its fetch and work +// CancelCauseFuncs when it's stopping. It may be used by components for such +// cases like avoiding logging an error during a normal shutdown procedure. This +// is internal for the time being, but we could also consider exposing it. +var ErrShutdown = errors.New("shutdown initiated")