Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `Cancel` and `CancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141).
- Added `ClientFromContext` and `ClientWithContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).

## [0.0.16] - 2024-01-06
Expand Down
105 changes: 105 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ func (ts *clientTestSignals) Init() {
}

var (
// ErrNotFound is returned when a query by ID does not match any existing
// rows. For example, attempting to cancel a job that doesn't exist will
// return this error.
ErrNotFound = errors.New("not found")

errMissingConfig = errors.New("missing config")
errMissingDatabasePoolWithQueues = errors.New("must have a non-nil database pool to execute jobs (either use a driver with database pool or don't configure Queues)")
errMissingDriver = errors.New("missing database driver (try wrapping a Pgx pool with river/riverdriver/riverpgxv5.New)")
Expand Down Expand Up @@ -935,6 +940,106 @@ func (c *Client[TTx]) runProducers(fetchNewWorkCtx, workCtx context.Context) {
}
}

// Cancel cancels the job with the given ID. If possible, the job is cancelled
// immediately and will not be retried. The provided context is used for the
// underlying Postgres update and can be used to cancel the operation or apply a
// timeout.
//
// If the job is still in the queue (available, scheduled, or retryable), it is
// immediately marked as cancelled and will not be retried.
//
// If the job is already finalized (cancelled, completed, or discarded), no
// changes are made.
//
// If the job is currently running, it is not immediately cancelled, but is
// instead marked for cancellation. The client running the job will also be
// notified (via LISTEN/NOTIFY) to cancel the running job's context. Although
// the job's context will be cancelled, since Go does not provide a mechanism to
// interrupt a running goroutine the job will continue running until it returns.
// As always, it is important for workers to respect context cancellation and
// return promptly when the job context is done.
//
// Once the cancellation signal is received by the client running the job, any
// error returned by that job will result in it being cancelled permanently and
// not retried. However if the job returns no error, it will be completed as
// usual.
//
// In the event the running job finishes executing _before_ the cancellation
// signal is received but _after_ this update was made, the behavior depends on
// which state the job is being transitioned into (based on its return error):
//
// - If the job completed successfully, was cancelled from within, or was
// discarded due to exceeding its max attempts, the job will be updated as
// usual.
// - If the job was snoozed to run again later or encountered a retryable error,
// the job will be marked as cancelled and will not be attempted again.
//
// Returns the up-to-date JobRow for the specified jobID if it exists. Returns
// ErrNotFound if the job doesn't exist.
func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) {
job, err := c.adapter.JobCancel(ctx, jobID)
if err != nil {
if errors.Is(err, riverdriver.ErrNoRows) {
return nil, ErrNotFound
}
return nil, err
}

return dbsqlc.JobRowFromInternal(job), nil
}

// CancelTx cancels the job with the given ID within the specified transaction.
// This variant lets a caller cancel a job atomically alongside other database
// changes. An cancelled job doesn't take effect until the transaction commits,
// and if the transaction rolls back, so too is the cancelled job.
//
// If possible, the job is cancelled immediately and will not be retried. The
// provided context is used for the underlying Postgres update and can be used
// to cancel the operation or apply a timeout.
//
// If the job is still in the queue (available, scheduled, or retryable), it is
// immediately marked as cancelled and will not be retried.
//
// If the job is already finalized (cancelled, completed, or discarded), no
// changes are made.
//
// If the job is currently running, it is not immediately cancelled, but is
// instead marked for cancellation. The client running the job will also be
// notified (via LISTEN/NOTIFY) to cancel the running job's context. Although
// the job's context will be cancelled, since Go does not provide a mechanism to
// interrupt a running goroutine the job will continue running until it returns.
// As always, it is important for workers to respect context cancellation and
// return promptly when the job context is done.
//
// Once the cancellation signal is received by the client running the job, any
// error returned by that job will result in it being cancelled permanently and
// not retried. However if the job returns no error, it will be completed as
// usual.
//
// In the event the running job finishes executing _before_ the cancellation
// signal is received but _after_ this update was made, the behavior depends on
// which state the job is being transitioned into (based on its return error):
//
// - If the job completed successfully, was cancelled from within, or was
// discarded due to exceeding its max attempts, the job will be updated as
// usual.
// - If the job was snoozed to run again later or encountered a retryable error,
// the job will be marked as cancelled and will not be attempted again.
//
// Returns the up-to-date JobRow for the specified jobID if it exists. Returns
// ErrNotFound if the job doesn't exist.
func (c *Client[TTx]) CancelTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error) {
job, err := c.adapter.JobCancelTx(ctx, c.driver.UnwrapTx(tx), jobID)
if errors.Is(err, riverdriver.ErrNoRows) {
return nil, ErrNotFound
}
if err != nil {
return nil, err
}

return dbsqlc.JobRowFromInternal(job), nil
}

func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbadapter.JobInsertParams, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
Expand Down
128 changes: 126 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func Test_Client(t *testing.T) {
riverinternaltest.WaitOrTimeout(t, workedChan)
})

t.Run("JobCancel", func(t *testing.T) {
t.Run("JobCancelErrorReturned", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand Down Expand Up @@ -245,7 +245,7 @@ func Test_Client(t *testing.T) {
require.WithinDuration(t, time.Now(), *updatedJob.FinalizedAt, 2*time.Second)
})

t.Run("JobSnooze", func(t *testing.T) {
t.Run("JobSnoozeErrorReturned", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand Down Expand Up @@ -274,6 +274,130 @@ func Test_Client(t *testing.T) {
require.WithinDuration(t, time.Now().Add(15*time.Minute), updatedJob.ScheduledAt, 2*time.Second)
})

// This helper is used to test cancelling a job both _in_ a transaction and
// _outside of_ a transaction. The exact same test logic applies to each case,
// the only difference is a different cancelFunc provided by the specific
// subtest.
cancelRunningJobTestHelper := func(t *testing.T, cancelFunc func(ctx context.Context, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper
client, bundle := setup(t)

jobStartedChan := make(chan int64)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
return ctx.Err()
}))

statusUpdateCh := client.monitor.RegisterUpdates()
startClient(ctx, t, client)
waitForClientHealthy(ctx, t, statusUpdateCh)
Copy link
Contributor Author

@bgentry bgentry Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a flaky failure on these tests in this run. There wasn't anything particularly helpful in the failure logs, other than that the job kept running and didn't get cancelled. My guess, however, is that the notifier wasn't up and running before the cancel signal was sent.

I tried reproducing it with thousands of runs locally but couldn't get any failures. Ultimately I added this logic to wait for internal components to come up and be healthy prior to proceeding with the test.

It makes me wonder if there's a more systemic issue here; should we be waiting for some components to come up before returning from Start()? Or should our startClient test helper always be waiting for the client to become healthy before returning?

I'm re-running CI several times just to get more confidence that this fixed the issue, no more failures yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes me wonder if there's a more systemic issue here; should we be waiting for some components to come up before returning from Start()? Or should our startClient test helper always be waiting for the client to become healthy before returning?

Yeah, good question. I kind of suspect that ... yes, a Start that returned only when things were really healthy and ready to go would overall be better. Shorter term, I like the idea of a test helper that waits for everything to be fully healthy, like you've done here, but we may want to make it even more widespread with an easy way to get at it from riverinternaltest.

Copy link
Contributor Author

@bgentry bgentry Jan 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could easily update startClient to wait for the client to become healthy. That should reduce flakiness and shouldn't impact test timing much as everything starts quickly. However since there is no client health exposed externally yet, it's purely an internal solution for testing.

There's the goal of making sure the client is healthy and fully booted before proceeding; this may be desirable sometimes, but other times users may wish to allow the client to finish booting asynchronously while their app does other initialization work. But then there's the separate goal of monitoring the client's health as it operates, and any of these internal components could in theory have issues or connectivity issues during operation.

I'm sure there are other libraries which try to tackle these problems, but one particular instance I have recent experience with is LaunchDarkly's SDK. It's a bit poorly documented and feels overly abstracted / Java-like, but the concepts there are reasonably well-solved and that's what I was intending to do when I initially built the client monitor stuff internally. I don't claim that the design is perfect or finished but at least the general problem is somewhat solved by it. And maybe some of it can/should be baked in as part of Start() so users don't need to think about it as much.

We should probably take a look at this both for our own tests but also for users' sake in follow up PRs.


insertedJob, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertedJob.ID, startedJobID)

// Cancel the job:
updatedJob, err := cancelFunc(ctx, client, insertedJob.ID)
require.NoError(t, err)
require.NotNil(t, updatedJob)
// Job is still actively running at this point because the query wouldn't
// modify that column for a running job:
require.Equal(t, rivertype.JobStateRunning, updatedJob.State)

event := riverinternaltest.WaitOrTimeout(t, bundle.subscribeChan)
require.Equal(t, EventKindJobCancelled, event.Kind)
require.Equal(t, JobStateCancelled, event.Job.State)
require.WithinDuration(t, time.Now(), *event.Job.FinalizedAt, 2*time.Second)

jobAfterCancel, err := bundle.queries.JobGetByID(ctx, client.driver.GetDBPool(), insertedJob.ID)
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateCancelled, jobAfterCancel.State)
require.WithinDuration(t, time.Now(), *jobAfterCancel.FinalizedAt, 2*time.Second)
}

t.Run("CancelRunningJob", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.Cancel(ctx, jobID)
})
})

t.Run("CancelRunningJobInTx", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
var (
job *rivertype.JobRow
err error
)
txErr := pgx.BeginFunc(ctx, client.driver.GetDBPool(), func(tx pgx.Tx) error {
job, err = client.CancelTx(ctx, tx, jobID)
return err
})
require.NoError(t, txErr)
return job, err
})
})

t.Run("CancelScheduledJob", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

jobStartedChan := make(chan int64)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
return ctx.Err()
}))

startClient(ctx, t, client)

insertedJob, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(5 * time.Minute)})
require.NoError(t, err)

// Cancel the job:
updatedJob, err := client.Cancel(ctx, insertedJob.ID)
require.NoError(t, err)
require.NotNil(t, updatedJob)
require.Equal(t, rivertype.JobStateCancelled, updatedJob.State)
require.WithinDuration(t, time.Now(), *updatedJob.FinalizedAt, 2*time.Second)
})

t.Run("CancelNonExistentJob", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
startClient(ctx, t, client)

// Cancel an unknown job ID:
jobAfter, err := client.Cancel(ctx, 0)
require.ErrorIs(t, err, ErrNotFound)
require.Nil(t, jobAfter)

// Cancel an unknown job ID, within a transaction:
err = pgx.BeginFunc(ctx, client.driver.GetDBPool(), func(tx pgx.Tx) error {
jobAfter, err := client.CancelTx(ctx, tx, 0)
require.ErrorIs(t, err, ErrNotFound)
require.Nil(t, jobAfter)
return nil
})
require.NoError(t, err)
})

t.Run("AlternateSchema", func(t *testing.T) {
t.Parallel()

Expand Down
102 changes: 102 additions & 0 deletions example_cancel_from_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package river_test

import (
"context"
"errors"
"log/slog"
"time"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/internal/util/slogutil"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

type SleepingArgs struct{}

func (args SleepingArgs) Kind() string { return "SleepingWorker" }

type SleepingWorker struct {
river.WorkerDefaults[CancellingArgs]
jobChan chan int64
}

func (w *SleepingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs]) error {
w.jobChan <- job.ID
select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
return errors.New("sleeping worker timed out")
}
return ctx.Err()
}

// Example_cancelJobFromClient demonstrates how to permanently cancel a job from
// any Client using Cancel.
func Example_cancelJobFromClient() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
if err != nil {
panic(err)
}
defer dbPool.Close()

// Required for the purpose of this test, but not necessary in real usage.
if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
panic(err)
}

jobChan := make(chan int64)

workers := river.NewWorkers()
river.AddWorker(workers, &SleepingWorker{jobChan: jobChan})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
},
Workers: workers,
})
if err != nil {
panic(err)
}

// Not strictly needed, but used to help this test wait until job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled)
defer subscribeCancel()

if err := riverClient.Start(ctx); err != nil {
panic(err)
}
job, err := riverClient.Insert(ctx, CancellingArgs{ShouldCancel: true}, nil)
if err != nil {
panic(err)
}
select {
case <-jobChan:
case <-time.After(2 * time.Second):
panic("no jobChan signal received")
}

// There is presently no way to wait for the client to be 100% ready, so we
// sleep for a bit to give it time to start up. This is only needed in this
// example because we need the notifier to be ready for it to receive the
// cancellation signal.
time.Sleep(500 * time.Millisecond)

if _, err = riverClient.Cancel(ctx, job.ID); err != nil {
panic(err)
}
waitForNJobs(subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

// Output:
// jobExecutor: job cancelled remotely
}
Loading