Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ type Client[TTx any] struct {
// fetchNewWorkCancel cancels the context used for fetching new work. This
// will be used to stop fetching new work whenever stop is initiated, or
// when the context provided to Run is itself cancelled.
fetchNewWorkCancel context.CancelFunc
fetchNewWorkCancel context.CancelCauseFunc

id string
monitor *clientMonitor
Expand All @@ -276,7 +276,7 @@ type Client[TTx any] struct {

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
workCancel context.CancelFunc
workCancel context.CancelCauseFunc
}

// Test-only signals.
Expand Down Expand Up @@ -569,9 +569,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// We use separate contexts for fetching and working to allow for a graceful
// stop. However, both inherit from the provided context so if it is
// cancelled a more aggressive stop will be initiated.
fetchNewWorkCtx, fetchNewWorkCancel := context.WithCancel(ctx)
fetchNewWorkCtx, fetchNewWorkCancel := context.WithCancelCause(ctx)
c.fetchNewWorkCancel = fetchNewWorkCancel
workCtx, workCancel := context.WithCancel(withClient[TTx](ctx, c))
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
c.workCancel = workCancel

// Before doing anything else, make an initial connection to the database to
Expand Down Expand Up @@ -690,7 +690,7 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Stop started")
c.fetchNewWorkCancel()
c.fetchNewWorkCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}

Expand All @@ -715,8 +715,8 @@ func (c *Client[TTx]) awaitStop(ctx context.Context) error {
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.fetchNewWorkCancel()
c.workCancel()
c.fetchNewWorkCancel(rivercommon.ErrShutdown)
c.workCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}

Expand Down
10 changes: 3 additions & 7 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func Test_Client(t *testing.T) {
require.Equal(t, `relation "river_job" does not exist`, pgErr.Message)
})

t.Run("Stopped", func(t *testing.T) {
t.Run("StopAndCancel", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
Expand All @@ -441,6 +441,7 @@ func Test_Client(t *testing.T) {
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
close(jobDoneChan)
return nil
}))
Expand All @@ -459,12 +460,7 @@ func Test_Client(t *testing.T) {
default:
}

stopCtx, stopCancel := context.WithTimeout(ctx, 5*time.Second)
t.Cleanup(stopCancel)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took this out because with the addition of require.ErrorIs above, it revealed that it occasionally causes an intermittent failure as it kicks in and supersedes the shutdown's cancellation. I don't think this is needed anyway because if shutdown's context cancellation doesn't succeed, I don't think we could expect this one to succeed either. Also, not having keeps the test more succinct.


if err := client.StopAndCancel(stopCtx); err != nil {
t.Fatal(err)
}
require.NoError(t, client.StopAndCancel(ctx))

riverinternaltest.WaitOrTimeout(t, client.Stopped())
})
Expand Down
14 changes: 2 additions & 12 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/componentstatus"
"github.com/riverqueue/river/internal/rivercommon"
)

const statementTimeout = 5 * time.Second
Expand Down Expand Up @@ -135,18 +136,7 @@ func (n *Notifier) getConnAndRun(ctx context.Context) {

conn, err := n.establishConn(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I wasn't quite able to figure out is why this condition wasn't detecting the cancellation previously. I would've thought that even without a special cancellation error, this still would've been enough to intercept a problem before falling through to logging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, hard to tell without digging into the code, but maybe it's possible pgx doesn't surface this source error correctly? establishConn is a simple method that just returns the result of pgx.ConnectConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, that's probably it actually. The error being checked is now coming from a different place (context), which would explain why this version works.

return
}
// Log at a lower verbosity level in case an error is received when the
// context is already done (probably because the client is stopping).
// Example tests can finish before the notifier connects and starts
// listening, and on client stop may produce a connection error that
// would otherwise pollute output and fail the test.
select {
case <-ctx.Done():
n.logger.Info("error establishing connection from pool", "err", err)
default:
if !errors.Is(context.Cause(ctx), rivercommon.ErrShutdown) {
n.logger.Error("error establishing connection from pool", "err", err)
}
return
Expand Down
10 changes: 0 additions & 10 deletions internal/rivercommon/constants.go

This file was deleted.

18 changes: 18 additions & 0 deletions internal/rivercommon/river_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package rivercommon

import "errors"

// These constants are made available in rivercommon so that they're accessible
// by internal packages, but the top-level river package re-exports them, and
// all user code must use that set instead.
const (
MaxAttemptsDefault = 25
PriorityDefault = 1
QueueDefault = "default"
)

// ErrShutdown is a special error injected by the client into its fetch and work
// CancelCauseFuncs when it's stopping. It may be used by components for such
// cases like avoiding logging an error during a normal shutdown procedure. This
// is internal for the time being, but we could also consider exposing it.
var ErrShutdown = errors.New("shutdown initiated")