Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad
return insertParams, nil
}

var errInsertNoDriverDBPool = errors.New("driver must have non-nil database pool to use Insert and InsertMany (try InsertTx or InsertManyTx instead")
var errNoDriverDBPool = errors.New("driver must have non-nil database pool to use non-transactional methods like Insert and InsertMany (try InsertTx or InsertManyTx instead")

// Insert inserts a new job with the provided args. Job opts can be used to
// override any defaults that may have been provided by an implementation of
Expand All @@ -1122,7 +1122,7 @@ var errInsertNoDriverDBPool = errors.New("driver must have non-nil database pool
// }
func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*rivertype.JobRow, error) {
if c.driver.GetDBPool() == nil {
return nil, errInsertNoDriverDBPool
return nil, errNoDriverDBPool
}

if err := c.validateJobArgs(args); err != nil {
Expand Down Expand Up @@ -1201,7 +1201,7 @@ type InsertManyParams struct {
// }
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int64, error) {
if c.driver.GetDBPool() == nil {
return 0, errInsertNoDriverDBPool
return 0, errNoDriverDBPool
}

insertParams, err := c.insertManyParams(params)
Expand Down Expand Up @@ -1302,3 +1302,57 @@ func validateQueueName(queueName string) error {
}
return nil
}

// JobList returns a paginated list of jobs matching the provided filters. The
// provided context is used for the underlying Postgres query and can be used to
// cancel the operation or apply a timeout.
//
// params := river.NewJobListParams().WithLimit(10).State(river.JobStateCompleted)
// jobRows, err := client.JobList(ctx, params)
// if err != nil {
// // handle error
// }
func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) ([]*rivertype.JobRow, error) {
if c.driver.GetDBPool() == nil {
return nil, errNoDriverDBPool
}

if params == nil {
params = NewJobListParams()
}
dbParams, err := params.toDBParams()
if err != nil {
return nil, err
}

internalJobs, err := c.adapter.JobList(ctx, *dbParams)
if err != nil {
return nil, err
}
return dbsqlc.JobRowsFromInternal(internalJobs), nil
}

// JobListTx returns a paginated list of jobs matching the provided filters. The
// provided context is used for the underlying Postgres query and can be used to
// cancel the operation or apply a timeout.
//
// params := river.NewJobListParams().WithLimit(10).State(river.JobStateCompleted)
// jobRows, err := client.JobListTx(ctx, tx, params)
// if err != nil {
// // handle error
// }
func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListParams) ([]*rivertype.JobRow, error) {
if params == nil {
params = NewJobListParams()
}
dbParams, err := params.toDBParams()
if err != nil {
return nil, err
}

internalJobs, err := c.adapter.JobListTx(ctx, c.driver.UnwrapTx(tx), *dbParams)
if err != nil {
return nil, err
}
return dbsqlc.JobRowsFromInternal(internalJobs), nil
}
218 changes: 216 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ func Test_Client_Insert(t *testing.T) {
require.NoError(t, err)

_, err = client.Insert(ctx, &noOpArgs{}, nil)
require.ErrorIs(t, err, errInsertNoDriverDBPool)
require.ErrorIs(t, err, errNoDriverDBPool)
})

t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) {
Expand Down Expand Up @@ -1072,7 +1072,7 @@ func Test_Client_InsertMany(t *testing.T) {
count, err := client.InsertMany(ctx, []InsertManyParams{
{Args: noOpArgs{}},
})
require.ErrorIs(t, err, errInsertNoDriverDBPool)
require.ErrorIs(t, err, errNoDriverDBPool)
require.Equal(t, int64(0), count)
})

Expand Down Expand Up @@ -1262,6 +1262,220 @@ func Test_Client_InsertManyTx(t *testing.T) {
})
}

func Test_Client_JobList(t *testing.T) {
t.Parallel()

var (
ctx = context.Background()
queries = dbsqlc.New()
)

type insertJobParams struct {
AttemptedAt *time.Time
FinalizedAt *time.Time
Kind string
Metadata []byte
Queue string
ScheduledAt *time.Time
State dbsqlc.JobState
}

insertJob := func(ctx context.Context, dbtx dbsqlc.DBTX, params insertJobParams) *dbsqlc.RiverJob {
job, err := queries.JobInsert(ctx, dbtx, dbsqlc.JobInsertParams{
Attempt: 1,
AttemptedAt: params.AttemptedAt,
FinalizedAt: params.FinalizedAt,
Kind: valutil.FirstNonZero(params.Kind, "test_kind"),
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: params.Metadata,
Priority: rivercommon.PriorityDefault,
Queue: QueueDefault,
ScheduledAt: params.ScheduledAt,
State: valutil.FirstNonZero(params.State, dbsqlc.JobStateAvailable),
})
require.NoError(t, err)
return job
}

type testBundle struct{}

setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

config := newTestConfig(t, nil)
client := newTestClient(ctx, t, config)

return client, &testBundle{}
}

t.Run("FiltersByState", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

job1 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateAvailable})
job2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateAvailable})
job3 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateRunning})

jobs, err := client.JobList(ctx, NewJobListParams().State(JobStateAvailable))
require.NoError(t, err)
// jobs ordered by ScheduledAt ASC by default
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(JobStateRunning))
require.NoError(t, err)
require.Equal(t, []int64{job3.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("SortsAvailableRetryableAndScheduledJobsByScheduledAt", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

now := time.Now().UTC()

states := map[rivertype.JobState]dbsqlc.JobState{
JobStateAvailable: dbsqlc.JobStateAvailable,
JobStateRetryable: dbsqlc.JobStateRetryable,
JobStateScheduled: dbsqlc.JobStateScheduled,
}
for state, dbState := range states {
job1 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbState, ScheduledAt: ptrutil.Ptr(now)})
job2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbState, ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})

jobs, err := client.JobList(ctx, NewJobListParams().State(state))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(state).OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
}
})

t.Run("SortsCancelledCompletedAndDiscardedJobsByFinalizedAt", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

now := time.Now().UTC()

states := map[rivertype.JobState]dbsqlc.JobState{
JobStateCancelled: dbsqlc.JobStateCancelled,
JobStateCompleted: dbsqlc.JobStateCompleted,
JobStateDiscarded: dbsqlc.JobStateDiscarded,
}
for state, dbState := range states {
job1 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbState, FinalizedAt: ptrutil.Ptr(now.Add(-10 * time.Second))})
job2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbState, FinalizedAt: ptrutil.Ptr(now.Add(-15 * time.Second))})

jobs, err := client.JobList(ctx, NewJobListParams().State(state))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(state).OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
}
})

t.Run("SortsRunningJobsByAttemptedAt", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

now := time.Now().UTC()
job1 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateRunning, AttemptedAt: ptrutil.Ptr(now)})
job2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateRunning, AttemptedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})

jobs, err := client.JobList(ctx, NewJobListParams().State(JobStateRunning))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(JobStateRunning).OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
// Sort order was explicitly reversed:
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("WithNilParamsFiltersToAvailableByDefault", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

now := time.Now().UTC()
job1 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateAvailable, ScheduledAt: ptrutil.Ptr(now)})
job2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateAvailable, ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
_ = insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateRunning})

jobs, err := client.JobList(ctx, nil)
require.NoError(t, err)
// sort order is switched by ScheduledAt values:
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("PaginatesWithAfter", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

now := time.Now().UTC()
job1 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateAvailable, ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateAvailable, ScheduledAt: ptrutil.Ptr(now)})
job3 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateRunning, AttemptedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job4 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateRunning, AttemptedAt: ptrutil.Ptr(now)})
job5 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateCompleted, FinalizedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job6 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateCompleted, FinalizedAt: ptrutil.Ptr(now)})
jobRow1 := dbsqlc.JobRowFromInternal(job1)
jobRow3 := dbsqlc.JobRowFromInternal(job3)
jobRow5 := dbsqlc.JobRowFromInternal(job5)

jobs, err := client.JobList(ctx, NewJobListParams().After(JobListCursorFromJob(jobRow1)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(rivertype.JobStateRunning).After(JobListCursorFromJob(jobRow3)))
require.NoError(t, err)
require.Equal(t, []int64{job4.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(rivertype.JobStateCompleted).After(JobListCursorFromJob(jobRow5)))
require.NoError(t, err)
require.Equal(t, []int64{job6.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("MetadataOnly", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

job1 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{Metadata: []byte(`{"foo": "bar"}`)})
job2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{Metadata: []byte(`{"baz": "value"}`)})
job3 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{Metadata: []byte(`{"baz": "value"}`)})

jobs, err := client.JobList(ctx, NewJobListParams().State("").Metadata(`{"foo": "bar"}`))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State("").Metadata(`{"baz": "value"}`).OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
// Sort order was explicitly reversed:
require.Equal(t, []int64{job3.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("WithCancelledContext", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

ctx, cancel := context.WithCancel(ctx)
cancel() // cancel immediately

jobs, err := client.JobList(ctx, NewJobListParams().State(JobStateRunning))
require.ErrorIs(t, context.Canceled, err)
require.Empty(t, jobs)
})
}

func Test_Client_ErrorHandler(t *testing.T) {
t.Parallel()

Expand Down
Loading