Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `Cancel` and `CancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141).
- Added `JobCancel` and `JobCancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141) and [PR #152](https://github.com/riverqueue/river/pull/152).
- Added `ClientFromContext` and `ClientWithContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).
- Add `JobList` API for listing jobs. [PR #117](https://github.com/riverqueue/river/pull/117).

Expand Down
21 changes: 11 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,10 +940,10 @@ func (c *Client[TTx]) runProducers(fetchNewWorkCtx, workCtx context.Context) {
}
}

// Cancel cancels the job with the given ID. If possible, the job is cancelled
// immediately and will not be retried. The provided context is used for the
// underlying Postgres update and can be used to cancel the operation or apply a
// timeout.
// JobCancel cancels the job with the given ID. If possible, the job is
// cancelled immediately and will not be retried. The provided context is used
// for the underlying Postgres update and can be used to cancel the operation or
// apply a timeout.
//
// If the job is still in the queue (available, scheduled, or retryable), it is
// immediately marked as cancelled and will not be retried.
Expand Down Expand Up @@ -976,7 +976,7 @@ func (c *Client[TTx]) runProducers(fetchNewWorkCtx, workCtx context.Context) {
//
// Returns the up-to-date JobRow for the specified jobID if it exists. Returns
// ErrNotFound if the job doesn't exist.
func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) {
func (c *Client[TTx]) JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) {
job, err := c.adapter.JobCancel(ctx, jobID)
if err != nil {
if errors.Is(err, riverdriver.ErrNoRows) {
Expand All @@ -988,10 +988,11 @@ func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (*rivertype.JobRo
return dbsqlc.JobRowFromInternal(job), nil
}

// CancelTx cancels the job with the given ID within the specified transaction.
// This variant lets a caller cancel a job atomically alongside other database
// changes. An cancelled job doesn't take effect until the transaction commits,
// and if the transaction rolls back, so too is the cancelled job.
// JobCancelTx cancels the job with the given ID within the specified
// transaction. This variant lets a caller cancel a job atomically alongside
// other database changes. An cancelled job doesn't take effect until the
// transaction commits, and if the transaction rolls back, so too is the
// cancelled job.
//
// If possible, the job is cancelled immediately and will not be retried. The
// provided context is used for the underlying Postgres update and can be used
Expand Down Expand Up @@ -1028,7 +1029,7 @@ func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (*rivertype.JobRo
//
// Returns the up-to-date JobRow for the specified jobID if it exists. Returns
// ErrNotFound if the job doesn't exist.
func (c *Client[TTx]) CancelTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error) {
func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error) {
job, err := c.adapter.JobCancelTx(ctx, c.driver.UnwrapTx(tx), jobID)
if errors.Is(err, riverdriver.ErrNoRows) {
return nil, ErrNotFound
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func Test_Client(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.Cancel(ctx, jobID)
return client.JobCancel(ctx, jobID)
})
})

Expand All @@ -339,7 +339,7 @@ func Test_Client(t *testing.T) {
err error
)
txErr := pgx.BeginFunc(ctx, client.driver.GetDBPool(), func(tx pgx.Tx) error {
job, err = client.CancelTx(ctx, tx, jobID)
job, err = client.JobCancelTx(ctx, tx, jobID)
return err
})
require.NoError(t, txErr)
Expand Down Expand Up @@ -370,7 +370,7 @@ func Test_Client(t *testing.T) {
require.NoError(t, err)

// Cancel the job:
updatedJob, err := client.Cancel(ctx, insertedJob.ID)
updatedJob, err := client.JobCancel(ctx, insertedJob.ID)
require.NoError(t, err)
require.NotNil(t, updatedJob)
require.Equal(t, rivertype.JobStateCancelled, updatedJob.State)
Expand All @@ -384,13 +384,13 @@ func Test_Client(t *testing.T) {
startClient(ctx, t, client)

// Cancel an unknown job ID:
jobAfter, err := client.Cancel(ctx, 0)
jobAfter, err := client.JobCancel(ctx, 0)
require.ErrorIs(t, err, ErrNotFound)
require.Nil(t, jobAfter)

// Cancel an unknown job ID, within a transaction:
err = pgx.BeginFunc(ctx, client.driver.GetDBPool(), func(tx pgx.Tx) error {
jobAfter, err := client.CancelTx(ctx, tx, 0)
jobAfter, err := client.JobCancelTx(ctx, tx, 0)
require.ErrorIs(t, err, ErrNotFound)
require.Nil(t, jobAfter)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func (w *SleepingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs
return ctx.Err()
}

// Example_cancelJobFromClient demonstrates how to permanently cancel a job from
// any Client using Cancel.
func Example_cancelJobFromClient() {
// Example_jobCancelFromClient demonstrates how to permanently cancel a job from
// any Client using JobCancel.
func Example_jobCancelFromClient() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
Expand Down Expand Up @@ -88,7 +88,7 @@ func Example_cancelJobFromClient() {
// cancellation signal.
time.Sleep(500 * time.Millisecond)

if _, err = riverClient.Cancel(ctx, job.ID); err != nil {
if _, err = riverClient.JobCancel(ctx, job.ID); err != nil {
panic(err)
}
waitForNJobs(subscribeChan, 1)
Expand Down