From 6f18ea8f2c9d8c1ecfab3f3a73001ee7e3208fda Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 12 Jul 2024 17:43:24 -0700 Subject: [PATCH] Substantially (~20-45x) faster unique insertion using unique index Here, rebuild the unique job insertion infrastructure so that insertions become substantially faster, in the range of 20 to 45x. $ go test -bench=. ./internal/dbunique goos: darwin goarch: arm64 pkg: github.com/riverqueue/river/internal/dbunique BenchmarkUniqueInserter/FastPathEmptyDatabase-8 9632 126446 ns/op BenchmarkUniqueInserter/FastPathManyExistingJobs-8 9718 127795 ns/op BenchmarkUniqueInserter/SlowPathEmptyDatabase-8 468 3008752 ns/op BenchmarkUniqueInserter/SlowPathManyExistingJobs-8 214 6197776 ns/op PASS ok github.com/riverqueue/river/internal/dbunique 13.558s The speed up is accomplished by mostly abandoning the old methodology that took an advisory lock, did a job look up, and then did an insertion if no equivalent unique job was found. Instead, we add a new `unique_key` field to the jobs table, put a partial index on it, and use it in conjunction with `kind` to do upserts for unique insertions. Its value is similar to what we used for advisory locks -- a hash of a string representing the unique opts in question. There is however, a downside. `unique_key` is easy when all we need to think about are uniqueness based on something immutable like arguments or queue, but more difficult when we have to factor in job state, which may change over the lifetime of a job. To compensate for this, we clear `unique_key` on a job when setting it to states not included in the default unique state list, like when it's being cancelled or discarded. This allows a new job with the same unique properties to be inserted again. But the corollary of this technique is that if a state like `cancelled` or `discarded` is included in the `ByState` property, the technique obviously doesn't work anymore. So instead, in these cases we _keep_ the old insertion technique involving advisory locks, and fall back to this slower insertion path when we have to. So while we get the benefits of substantial performance improvements, we have the downside of more complex code -- there's now two paths to think about and which have to be tested. Overall though, I think the benefit is worth it. The addition does require a new index. Luckily it's a partial so it only gets used on unique inserts, and I benchmarked before/after, and found no degradation in non-unique insert performance. I added instructions to the CHANGELOG for building the index with `CONCURRENTLY` for any users who may already have a large jobs table, giving them an operationally safer alternative to use. --- CHANGELOG.md | 14 + client.go | 4 + cmd/river/go.mod | 2 + driver_test.go | 59 +++ insert_opts.go | 5 + internal/dbunique/db_unique.go | 271 +++++++++----- internal/dbunique/db_unique_test.go | 354 +++++++++++++++--- .../riverdrivertest/riverdrivertest.go | 205 +++++++++- .../testfactory/test_factory.go | 2 + job_executor_test.go | 11 + riverdriver/river_driver_interface.go | 14 + .../internal/dbsqlc/models.go | 1 + .../internal/dbsqlc/river_job.sql.go | 204 ++++++++-- .../main/005_river_migration_add_line.up.sql | 18 - ...dd_line_river_job_add_unique_key.down.sql} | 13 +- ...n_add_line_river_job_add_unique_key.up.sql | 34 ++ .../riverdatabasesql/river_database_sql.go | 51 +++ .../riverpgxv5/internal/dbsqlc/models.go | 1 + .../riverpgxv5/internal/dbsqlc/river_job.sql | 52 ++- .../internal/dbsqlc/river_job.sql.go | 204 ++++++++-- .../main/005_river_migration_add_line.up.sql | 18 - ...dd_line_river_job_add_unique_key.down.sql} | 13 +- ...n_add_line_river_job_add_unique_key.up.sql | 34 ++ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 51 +++ rivertype/river_type.go | 5 + 25 files changed, 1349 insertions(+), 291 deletions(-) delete mode 100644 riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.up.sql rename riverdriver/riverdatabasesql/migration/main/{005_river_migration_add_line.down.sql => 005_river_migration_add_line_river_job_add_unique_key.down.sql} (74%) create mode 100644 riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql delete mode 100644 riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.up.sql rename riverdriver/riverpgxv5/migration/main/{005_river_migration_add_line.down.sql => 005_river_migration_add_line_river_job_add_unique_key.down.sql} (74%) create mode 100644 riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 33d8e128..f11ddc04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,20 @@ go install github.com/riverqueue/river/cmd/river@latest river migrate-up --database-url "$DATABASE_URL" ``` +The migration **includes a new index**. Users with a very large job table may want to consider raising the index separately using `CONCURRENTLY` (which must be run outside of a transaction), then run `river migrate-up` to finalize the process (it will tolerate an index that already exists): + +```sql +ALTER TABLE river_job + ADD COLUMN unique_key bytea; + +CREATE UNIQUE INDEX CONCURRENTLY river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; +``` + +```shell +go install github.com/riverqueue/river/cmd/river@latest +river migrate-up --database-url "$DATABASE_URL" +``` + ### Added - Fully functional driver for `database/sql` for use with packages like Bun and GORM. [PR #351](https://github.com/riverqueue/river/pull/351). diff --git a/client.go b/client.go index c3d0cb5e..babc796d 100644 --- a/client.go +++ b/client.go @@ -65,6 +65,10 @@ type Config struct { // option then it's recommended to leave it unset because the prefix leaves // only 32 bits of number space for advisory lock hashes, so it makes // internally conflicting River-generated keys more likely. + // + // Advisory locks are currently only used for the fallback/slow path of + // unique job insertion where finalized states are included in a ByState + // configuration. AdvisoryLockPrefix int32 // CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs diff --git a/cmd/river/go.mod b/cmd/river/go.mod index d20620a6..bc6069d9 100644 --- a/cmd/river/go.mod +++ b/cmd/river/go.mod @@ -10,6 +10,8 @@ replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverd replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5 +replace github.com/riverqueue/river/rivertype => ../../rivertype + require ( github.com/jackc/pgx/v5 v5.6.0 github.com/lmittmann/tint v1.0.4 diff --git a/driver_test.go b/driver_test.go index 54abbfa6..7280fc81 100644 --- a/driver_test.go +++ b/driver_test.go @@ -192,3 +192,62 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { }) }) } + +func BenchmarkDriverRiverPgxV5Insert(b *testing.B) { + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + tx pgx.Tx + } + + setup := func(b *testing.B) (*riverpgxv5.Driver, *testBundle) { + b.Helper() + + var ( + driver = riverpgxv5.New(nil) + tx = riverinternaltest.TestTx(ctx, b) + ) + + bundle := &testBundle{ + exec: driver.UnwrapExecutor(tx), + tx: tx, + } + + return driver, bundle + } + + b.Run("InsertFast", func(b *testing.B) { + _, bundle := setup(b) + + for n := 0; n < b.N; n++ { + _, err := bundle.exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + }) + require.NoError(b, err) + } + }) + + b.Run("InsertUnique", func(b *testing.B) { + _, bundle := setup(b) + + for n := 0; n < b.N; n++ { + _, err := bundle.exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + }, + }) + require.NoError(b, err) + } + }) +} diff --git a/insert_opts.go b/insert_opts.go index fcaa8f15..423c2cde 100644 --- a/insert_opts.go +++ b/insert_opts.go @@ -136,6 +136,11 @@ type UniqueOpts struct { // With this setting, any jobs of the same kind that have been completed or // discarded, but not yet cleaned out by the system, won't count towards the // uniqueness of a new insert. + // + // Warning: A non-default slice of states in ByState will force the unique + // inserter to fall back to a slower insertion path that takes an advisory + // lock and performs a look up before insertion. For best performance, it's + // recommended that the default set of states is used. ByState []rivertype.JobState } diff --git a/internal/dbunique/db_unique.go b/internal/dbunique/db_unique.go index 34bcb41e..ca6890b1 100644 --- a/internal/dbunique/db_unique.go +++ b/internal/dbunique/db_unique.go @@ -2,6 +2,7 @@ package dbunique import ( "context" + "crypto/sha256" "errors" "fmt" "slices" @@ -20,14 +21,16 @@ import ( // determine uniqueness. So for example, a new unique job may be inserted even // if another job already exists, as long as that other job is set `cancelled` // or `discarded`. -var defaultUniqueStates = []string{ //nolint:gochecknoglobals - string(rivertype.JobStateAvailable), - string(rivertype.JobStateCompleted), - string(rivertype.JobStateRunning), - string(rivertype.JobStateRetryable), - string(rivertype.JobStateScheduled), +var defaultUniqueStates = []rivertype.JobState{ //nolint:gochecknoglobals + rivertype.JobStateAvailable, + rivertype.JobStateCompleted, + rivertype.JobStateRetryable, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, } +var defaultUniqueStatesStrings = sliceutil.Map(defaultUniqueStates, func(s rivertype.JobState) string { return string(s) }) //nolint:gochecknoglobals + type UniqueOpts struct { ByArgs bool ByPeriod time.Duration @@ -48,115 +51,175 @@ type UniqueInserter struct { } func (i *UniqueInserter) JobInsert(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) (*rivertype.JobInsertResult, error) { - var tx riverdriver.ExecutorTx - - if uniqueOpts != nil && !uniqueOpts.IsEmpty() { - // For uniqueness checks returns an advisory lock hash to use for lock, - // parameters to check for an existing unique job with the same - // properties, and a boolean indicating whether a uniqueness check - // should be performed at all (in some cases the check can be skipped if - // we can determine ahead of time that this insert will not violate - // uniqueness conditions). - buildUniqueParams := func() (*hashutil.AdvisoryLockHash, *riverdriver.JobGetByKindAndUniquePropertiesParams, bool) { - advisoryLockHash := hashutil.NewAdvisoryLockHash(i.AdvisoryLockPrefix) - advisoryLockHash.Write([]byte("unique_key")) - advisoryLockHash.Write([]byte("kind=" + params.Kind)) - - getParams := riverdriver.JobGetByKindAndUniquePropertiesParams{ - Kind: params.Kind, - } - - if uniqueOpts.ByArgs { - advisoryLockHash.Write([]byte("&args=")) - advisoryLockHash.Write(params.EncodedArgs) - - getParams.Args = params.EncodedArgs - getParams.ByArgs = true - } - - if uniqueOpts.ByPeriod != time.Duration(0) { - lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) - - advisoryLockHash.Write([]byte("&period=" + lowerPeriodBound.Format(time.RFC3339))) - - getParams.ByCreatedAt = true - getParams.CreatedAtBegin = lowerPeriodBound - getParams.CreatedAtEnd = lowerPeriodBound.Add(uniqueOpts.ByPeriod) - } - - if uniqueOpts.ByQueue { - advisoryLockHash.Write([]byte("&queue=" + params.Queue)) - - getParams.ByQueue = true - getParams.Queue = params.Queue - } - - { - stateSet := defaultUniqueStates - if len(uniqueOpts.ByState) > 0 { - stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) - } - - advisoryLockHash.Write([]byte("&state=" + strings.Join(stateSet, ","))) - - if !slices.Contains(stateSet, string(params.State)) { - return nil, nil, false - } - - getParams.ByState = true - getParams.State = stateSet - } - - return advisoryLockHash, &getParams, true + // With no unique options set, do a normal non-unique insert. + if uniqueOpts == nil || uniqueOpts.IsEmpty() { + return insertNonUnique(ctx, exec, params) + } + + // Build a unique key for use in either the `(kind, unique_key)` index or in + // an advisory lock prefix if we end up taking the slow path. + uniqueKey, doUniqueInsert := i.buildUniqueKey(params, uniqueOpts) + if !doUniqueInsert { + return insertNonUnique(ctx, exec, params) + } + + // Sort so we can more easily compare against default state list. + if uniqueOpts.ByState != nil { + slices.Sort(uniqueOpts.ByState) + } + + // Fast path: as long as uniqueness uses the default set of lifecycle states + // we can take the fast path wherein uniqueness is determined based on an + // upsert to a unique index on `(kind, unique_key)`. This works because when + // cancelling or discarding jobs the executor/completer will zero the job's + // `unique_key` field, taking it out of consideration for future inserts + // given the same unique opts. + if uniqueOpts.ByState == nil || slices.Compare(defaultUniqueStates, uniqueOpts.ByState) == 0 { + uniqueKeyHash := sha256.Sum256([]byte(uniqueKey)) + + insertRes, err := exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: params, + UniqueKey: uniqueKeyHash[:], + }) + if err != nil { + return nil, err } - if advisoryLockHash, getParams, doUniquenessCheck := buildUniqueParams(); doUniquenessCheck { - // Begin a subtransaction - exec, err := exec.Begin(ctx) - if err != nil { - return nil, err - } - defer exec.Rollback(ctx) - - // Make the subtransaction available at function scope so it can be - // committed in cases where we insert a job. - tx = exec - - // The wrapping transaction should maintain snapshot consistency - // even if we were to only have a SELECT + INSERT, but given that a - // conflict is possible, obtain an advisory lock based on the - // parameters of the unique job first, and have contending inserts - // wait for it. This is a synchronous lock so we rely on context - // timeout in case something goes wrong and it's blocking for too - // long. - if _, err := exec.PGAdvisoryXactLock(ctx, advisoryLockHash.Key()); err != nil { - return nil, fmt.Errorf("error acquiring unique lock: %w", err) - } - - existing, err := exec.JobGetByKindAndUniqueProperties(ctx, getParams) - if err != nil { - if !errors.Is(err, rivertype.ErrNotFound) { - return nil, fmt.Errorf("error getting unique job: %w", err) - } - } - - if existing != nil { - // Insert skipped; returns an existing row. - return &rivertype.JobInsertResult{Job: existing, UniqueSkippedAsDuplicate: true}, nil - } + return (*rivertype.JobInsertResult)(insertRes), nil + } + + // Slow path: open a subtransaction, take an advisory lock, check to see if + // a job with the given criteria exists, then either return an existing row + // or insert a new one. + + advisoryLockHash := hashutil.NewAdvisoryLockHash(i.AdvisoryLockPrefix) + advisoryLockHash.Write([]byte("unique_key")) + advisoryLockHash.Write([]byte("kind=" + params.Kind)) + advisoryLockHash.Write([]byte(uniqueKey)) + + getParams := i.buildGetParams(params, uniqueOpts) + + // Begin a subtransaction + subExec, err := exec.Begin(ctx) + if err != nil { + return nil, err + } + defer subExec.Rollback(ctx) + + // The wrapping transaction should maintain snapshot consistency even if we + // were to only have a SELECT + INSERT, but given that a conflict is + // possible, obtain an advisory lock based on the parameters of the unique + // job first, and have contending inserts wait for it. This is a synchronous + // lock so we rely on context timeout in case something goes wrong and it's + // blocking for too long. + if _, err := subExec.PGAdvisoryXactLock(ctx, advisoryLockHash.Key()); err != nil { + return nil, fmt.Errorf("error acquiring unique lock: %w", err) + } + + existing, err := subExec.JobGetByKindAndUniqueProperties(ctx, getParams) + if err != nil { + if !errors.Is(err, rivertype.ErrNotFound) { + return nil, fmt.Errorf("error getting unique job: %w", err) } } - jobRow, err := exec.JobInsertFast(ctx, params) + if existing != nil { + // Insert skipped; returns an existing row. + return &rivertype.JobInsertResult{Job: existing, UniqueSkippedAsDuplicate: true}, nil + } + + jobRow, err := subExec.JobInsertFast(ctx, params) if err != nil { return nil, err } - if tx != nil { - if err := tx.Commit(ctx); err != nil { - return nil, err + if err := subExec.Commit(ctx); err != nil { + return nil, err + } + + return &rivertype.JobInsertResult{Job: jobRow}, nil +} + +// Builds a unique key made up of the unique options in place. The key is hashed +// to become a value for `unique_key` in the fast insertion path, or hashed and +// used for an advisory lock on the slow insertion path. +func (i *UniqueInserter) buildUniqueKey(params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) (string, bool) { + var sb strings.Builder + + if uniqueOpts.ByArgs { + sb.WriteString("&args=") + sb.Write(params.EncodedArgs) + } + + if uniqueOpts.ByPeriod != time.Duration(0) { + lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) + sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339)) + } + + if uniqueOpts.ByQueue { + sb.WriteString("&queue=" + params.Queue) + } + + { + stateSet := defaultUniqueStatesStrings + if len(uniqueOpts.ByState) > 0 { + stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) + slices.Sort(stateSet) + } + + sb.WriteString("&state=" + strings.Join(stateSet, ",")) + + if !slices.Contains(stateSet, string(params.State)) { + return "", false + } + } + + return sb.String(), true +} + +// Builds get parameters suitable for looking up a unique job on the slow unique +// insertion path. +func (i *UniqueInserter) buildGetParams(params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) *riverdriver.JobGetByKindAndUniquePropertiesParams { + getParams := riverdriver.JobGetByKindAndUniquePropertiesParams{ + Kind: params.Kind, + } + + if uniqueOpts.ByArgs { + getParams.Args = params.EncodedArgs + getParams.ByArgs = true + } + + if uniqueOpts.ByPeriod != time.Duration(0) { + lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) + + getParams.ByCreatedAt = true + getParams.CreatedAtBegin = lowerPeriodBound + getParams.CreatedAtEnd = lowerPeriodBound.Add(uniqueOpts.ByPeriod) + } + + if uniqueOpts.ByQueue { + getParams.ByQueue = true + getParams.Queue = params.Queue + } + + { + stateSet := defaultUniqueStatesStrings + if len(uniqueOpts.ByState) > 0 { + stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) } + + getParams.ByState = true + getParams.State = stateSet } + return &getParams +} + +// Shared shortcut for inserting a row without uniqueness. +func insertNonUnique(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastParams) (*rivertype.JobInsertResult, error) { + jobRow, err := exec.JobInsertFast(ctx, params) + if err != nil { + return nil, err + } return &rivertype.JobInsertResult{Job: jobRow}, nil } diff --git a/internal/dbunique/db_unique_test.go b/internal/dbunique/db_unique_test.go index 065669ad..61b417cf 100644 --- a/internal/dbunique/db_unique_test.go +++ b/internal/dbunique/db_unique_test.go @@ -2,7 +2,9 @@ package dbunique import ( "context" + "fmt" "runtime" + "slices" "sync" "testing" "time" @@ -21,6 +23,30 @@ import ( "github.com/riverqueue/river/rivertype" ) +const queueAlternate = "alternate_queue" + +func makeInsertParams(createdAt *time.Time) *riverdriver.JobInsertFastParams { + return &riverdriver.JobInsertFastParams{ + CreatedAt: createdAt, + EncodedArgs: []byte(`{}`), + Kind: "fake_job", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: nil, + State: rivertype.JobStateAvailable, + } +} + +func TestDefaultUniqueStatesSorted(t *testing.T) { + t.Parallel() + + states := slices.Clone(defaultUniqueStates) + slices.Sort(states) + require.Equal(t, states, defaultUniqueStates, "Default unique states should be sorted") +} + func TestUniqueInserter_JobInsert(t *testing.T) { t.Parallel() @@ -60,26 +86,12 @@ func TestUniqueInserter_JobInsert(t *testing.T) { return inserter, bundle } - makeInsertParams := func(bundle *testBundle) *riverdriver.JobInsertFastParams { - return &riverdriver.JobInsertFastParams{ - CreatedAt: &bundle.baselineTime, - EncodedArgs: []byte(`{}`), - Kind: "fake_job", - MaxAttempts: rivercommon.MaxAttemptsDefault, - Metadata: []byte(`{}`), - Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, - ScheduledAt: nil, - State: rivertype.JobStateAvailable, - } - } - t.Run("Success", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) res, err := inserter.JobInsert(ctx, bundle.exec, insertParams, nil) require.NoError(t, err) @@ -111,7 +123,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { const maxJobsToFetch = 8 - res, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(bundle), nil) + res, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(&bundle.baselineTime), nil) require.NoError(t, err) require.NotEqual(t, 0, res.Job.ID, "expected job ID to be set, got %d", res.Job.ID) require.WithinDuration(t, time.Now(), res.Job.ScheduledAt, 1*time.Second) @@ -128,7 +140,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { "expected selected job to be in running state, got %q", jobs[0].State) for i := 1; i < 10; i++ { - _, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(bundle), nil) + _, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(&bundle.baselineTime), nil) require.NoError(t, err) } @@ -155,18 +167,19 @@ func TestUniqueInserter_JobInsert(t *testing.T) { "expected to fetch 1 remaining job but fetched %d jobs:\n%+v", len(jobs), jobs) }) - t.Run("UniqueJobByArgs", func(t *testing.T) { + t.Run("UniqueJobByArgsFastPath", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByArgs: true, } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -187,18 +200,53 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) }) - t.Run("UniqueJobByPeriod", func(t *testing.T) { + t.Run("UniqueJobByArgsSlowPath", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) + uniqueOpts := &UniqueOpts{ + ByArgs: true, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path + } + + res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Nil(t, res0.Job.UniqueKey) + require.False(t, res0.UniqueSkippedAsDuplicate) + + // Insert a second job with the same args, but expect that the same job + // ID to come back because we're still within its unique parameters. + res1, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Equal(t, res0.Job.ID, res1.Job.ID) + require.True(t, res1.UniqueSkippedAsDuplicate) + + insertParams.EncodedArgs = []byte(`{"key":"different"}`) + + // Same operation again, except that because we've modified the unique + // dimension, another job is allowed to be queued, so the new ID is + // not the same. + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + }) + + t.Run("UniqueJobByPeriodFastPath", func(t *testing.T) { + t.Parallel() + + inserter, bundle := setup(t) + + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByPeriod: 15 * time.Minute, } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -219,18 +267,53 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) }) - t.Run("UniqueJobByQueue", func(t *testing.T) { + t.Run("UniqueJobByPeriodSlowPath", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) + uniqueOpts := &UniqueOpts{ + ByPeriod: 15 * time.Minute, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path + } + + res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.Nil(t, res0.Job.UniqueKey) + require.NoError(t, err) + require.False(t, res0.UniqueSkippedAsDuplicate) + + // Insert a second job with the same args, but expect that the same job + // ID to come back because we're still within its unique parameters. + res1, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Equal(t, res0.Job.ID, res1.Job.ID) + require.True(t, res1.UniqueSkippedAsDuplicate) + + inserter.Time.StubNowUTC(bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second)) + + // Same operation again, except that because we've advanced time passed + // the period within unique bounds, another job is allowed to be queued, + // so the new ID is not the same. + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + }) + + t.Run("UniqueJobByQueueFastPath", func(t *testing.T) { + t.Parallel() + + inserter, bundle := setup(t) + + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByQueue: true, } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -240,7 +323,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.Equal(t, res0.Job.ID, res1.Job.ID) require.True(t, res1.UniqueSkippedAsDuplicate) - insertParams.Queue = "alternate_queue" + insertParams.Queue = queueAlternate // Same operation again, except that because we've modified the unique // dimension, another job is allowed to be queued, so the new ID is @@ -251,18 +334,20 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) }) - t.Run("UniqueJobByState", func(t *testing.T) { + t.Run("UniqueJobByQueueSlowPath", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ - ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRunning}, + ByQueue: true, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.Nil(t, res0.Job.UniqueKey) require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -272,51 +357,32 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.Equal(t, res0.Job.ID, res1.Job.ID) require.True(t, res1.UniqueSkippedAsDuplicate) - // A new job is allowed if we're inserting the job with a state that's - // not included in the unique state set. - { - insertParams := *insertParams // dup - insertParams.State = rivertype.JobStatePending - - res2, err := inserter.JobInsert(ctx, bundle.exec, &insertParams, uniqueOpts) - require.NoError(t, err) - require.NotEqual(t, res0.Job.ID, res2.Job.ID) - require.False(t, res2.UniqueSkippedAsDuplicate) - } - - // A new job is also allowed if the state of the originally inserted job - // changes to one that's not included in the unique state set. - { - _, err := bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ - ID: res0.Job.ID, - FinalizedAtDoUpdate: true, - FinalizedAt: ptrutil.Ptr(bundle.baselineTime), - StateDoUpdate: true, - State: rivertype.JobStateCompleted, - }) - require.NoError(t, err) + insertParams.Queue = queueAlternate - res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) - require.NoError(t, err) - require.NotEqual(t, res0.Job.ID, res2.Job.ID) - require.False(t, res2.UniqueSkippedAsDuplicate) - } + // Same operation again, except that because we've modified the unique + // dimension, another job is allowed to be queued, so the new ID is + // not the same. + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) }) // Unlike other unique options, state gets a default set when it's not // supplied. This test case checks that the default is working as expected. - t.Run("UniqueJobByDefaultState", func(t *testing.T) { + t.Run("UniqueJobByDefaultStateFastPath", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByQueue: true, } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) // fast path because states are a subset of defaults require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -363,6 +429,8 @@ func TestUniqueInserter_JobInsert(t *testing.T) { FinalizedAt: ptrutil.Ptr(bundle.baselineTime), StateDoUpdate: true, State: rivertype.JobStateDiscarded, + UniqueKeyDoUpdate: true, // `unique_key` is normally NULLed by the client or completer + UniqueKey: nil, }) require.NoError(t, err) @@ -375,12 +443,65 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) }) + t.Run("UniqueJobByStateSlowPath", func(t *testing.T) { + t.Parallel() + + inserter, bundle := setup(t) + + insertParams := makeInsertParams(&bundle.baselineTime) + uniqueOpts := &UniqueOpts{ + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, + } + + res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Nil(t, res0.Job.UniqueKey) // slow path because states are *not* a subset of defaults + require.False(t, res0.UniqueSkippedAsDuplicate) + + // Insert a second job with the same args, but expect that the same job + // ID to come back because we're still within its unique parameters. + res1, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Equal(t, res0.Job.ID, res1.Job.ID) + require.True(t, res1.UniqueSkippedAsDuplicate) + + // A new job is allowed if we're inserting the job with a state that's + // not included in the unique state set. + { + insertParams := *insertParams // dup + insertParams.State = rivertype.JobStateRunning + + res2, err := inserter.JobInsert(ctx, bundle.exec, &insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + } + + // A new job is also allowed if the state of the originally inserted job + // changes to one that's not included in the unique state set. + { + _, err := bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + ID: res0.Job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: ptrutil.Ptr(bundle.baselineTime), + StateDoUpdate: true, + State: rivertype.JobStateCompleted, + }) + require.NoError(t, err) + + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + } + }) + t.Run("UniqueJobAllOptions", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByArgs: true, ByPeriod: 15 * time.Minute, @@ -429,7 +550,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { // With queue modified { insertParams := *insertParams // dup - insertParams.Queue = "alternate_queue" + insertParams.Queue = queueAlternate // New job because a unique dimension has changed. res2, err := inserter.JobInsert(ctx, bundle.exec, &insertParams, uniqueOpts) @@ -459,7 +580,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { bundle.driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, t)) bundle.exec = bundle.driver.GetExecutor() - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByPeriod: 15 * time.Minute, } @@ -500,3 +621,116 @@ func TestUniqueInserter_JobInsert(t *testing.T) { } }) } + +func BenchmarkUniqueInserter(b *testing.B) { + ctx := context.Background() + + type testBundle struct { + driver riverdriver.Driver[pgx.Tx] + exec riverdriver.Executor + tx pgx.Tx + } + + setup := func(b *testing.B) (*UniqueInserter, *testBundle) { + b.Helper() + + var ( + driver = riverpgxv5.New(nil) + tx = riverinternaltest.TestTx(ctx, b) + ) + + bundle := &testBundle{ + driver: driver, + exec: driver.UnwrapExecutor(tx), + tx: tx, + } + + inserter := baseservice.Init(riversharedtest.BaseServiceArchetype(b), &UniqueInserter{}) + + return inserter, bundle + } + + // Simulates the case where many existing jobs are in the database already. + // Useful as a benchmark because the advisory lock strategy's look up get + // slow with many existing jobs. + generateManyExistingJobs := func(b *testing.B, inserter *UniqueInserter, bundle *testBundle) { + b.Helper() + + insertParams := makeInsertParams(nil) + + for i := 0; i < 10_000; i++ { + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, nil) + require.NoError(b, err) + } + } + + b.Run("FastPathEmptyDatabase", func(b *testing.B) { + inserter, bundle := setup(b) + + insertParams := makeInsertParams(nil) + uniqueOpts := &UniqueOpts{ByArgs: true} + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + insertParams.EncodedArgs = []byte(fmt.Sprintf(`{"job_num":%d}`, n%1000)) + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(b, err) + } + }) + + b.Run("FastPathManyExistingJobs", func(b *testing.B) { + inserter, bundle := setup(b) + + generateManyExistingJobs(b, inserter, bundle) + + insertParams := makeInsertParams(nil) + uniqueOpts := &UniqueOpts{ByArgs: true} + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + insertParams.EncodedArgs = []byte(fmt.Sprintf(`{"job_num":%d}`, n%1000)) + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(b, err) + } + }) + + b.Run("SlowPathEmptyDatabase", func(b *testing.B) { + inserter, bundle := setup(b) + + insertParams := makeInsertParams(nil) + uniqueOpts := &UniqueOpts{ + ByArgs: true, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + insertParams.EncodedArgs = []byte(fmt.Sprintf(`{"job_num":%d}`, n%1000)) + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(b, err) + } + }) + + b.Run("SlowPathManyExistingJobs", func(b *testing.B) { + inserter, bundle := setup(b) + + generateManyExistingJobs(b, inserter, bundle) + + insertParams := makeInsertParams(nil) + uniqueOpts := &UniqueOpts{ + ByArgs: true, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + insertParams.EncodedArgs = []byte(fmt.Sprintf(`{"job_num":%d}`, n%1000)) + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(b, err) + } + }) +} diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index e7885192..0e6408b1 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -222,7 +222,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, nowStr := now.Format(time.RFC3339Nano) job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: &startingState, + State: &startingState, + UniqueKey: []byte("unique-key"), }) require.Equal(t, startingState, job.State) @@ -237,6 +238,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, rivertype.JobStateCancelled, jobAfter.State) require.WithinDuration(t, time.Now(), *jobAfter.FinalizedAt, 2*time.Second) require.JSONEq(t, fmt.Sprintf(`{"cancel_attempted_at":%q}`, nowStr), string(jobAfter.Metadata)) + require.Nil(t, jobAfter.UniqueKey) }) } @@ -249,7 +251,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, nowStr := now.Format(time.RFC3339Nano) job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRunning), + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), }) require.Equal(t, rivertype.JobStateRunning, job.State) @@ -263,6 +266,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, rivertype.JobStateRunning, jobAfter.State) require.Nil(t, jobAfter.FinalizedAt) require.JSONEq(t, fmt.Sprintf(`{"cancel_attempted_at":%q}`, nowStr), string(jobAfter.Metadata)) + require.Equal(t, "unique-key", string(jobAfter.UniqueKey)) }) for _, startingState := range []rivertype.JobState{ @@ -995,6 +999,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, ScheduledAt: &now, State: rivertype.JobStateCompleted, Tags: []string{"tag"}, + UniqueKey: []byte("unique-key"), }) require.NoError(t, err) require.Equal(t, 3, job.Attempt) @@ -1011,6 +1016,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, requireEqualTime(t, now, job.ScheduledAt) require.Equal(t, rivertype.JobStateCompleted, job.State) require.Equal(t, []string{"tag"}, job.Tags) + require.Equal(t, []byte("unique-key"), job.UniqueKey) }) t.Run("JobFinalizedAtConstraint", func(t *testing.T) { @@ -1094,6 +1100,111 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("JobInsertUnique", func(t *testing.T) { + t.Parallel() + + t.Run("MinimalArgsWithDefaults", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + insertRes, err := exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + }, + UniqueKey: []byte("unique-key"), + }) + require.NoError(t, err) + require.Equal(t, 0, insertRes.Job.Attempt) + require.Nil(t, insertRes.Job.AttemptedAt) + require.WithinDuration(t, now, insertRes.Job.CreatedAt, 2*time.Second) + require.Equal(t, []byte(`{"encoded": "args"}`), insertRes.Job.EncodedArgs) + require.Empty(t, insertRes.Job.Errors) + require.Nil(t, insertRes.Job.FinalizedAt) + require.Equal(t, "test_kind", insertRes.Job.Kind) + require.Equal(t, rivercommon.MaxAttemptsDefault, insertRes.Job.MaxAttempts) + require.Equal(t, []byte(`{}`), insertRes.Job.Metadata) + require.Equal(t, rivercommon.PriorityDefault, insertRes.Job.Priority) + require.Equal(t, rivercommon.QueueDefault, insertRes.Job.Queue) + require.WithinDuration(t, now, insertRes.Job.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateAvailable, insertRes.Job.State) + require.Equal(t, []string{}, insertRes.Job.Tags) + require.Equal(t, []byte("unique-key"), insertRes.Job.UniqueKey) + }) + + t.Run("AllArgs", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + targetTime := time.Now().UTC().Add(-15 * time.Minute) + + insertRes, err := exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: &riverdriver.JobInsertFastParams{ + CreatedAt: &targetTime, + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: 6, + Metadata: []byte(`{"meta": "data"}`), + Priority: 2, + Queue: "queue_name", + ScheduledAt: &targetTime, + State: rivertype.JobStateRunning, + Tags: []string{"tag"}, + }, + UniqueKey: []byte("unique-key"), + }) + require.NoError(t, err) + require.Equal(t, 0, insertRes.Job.Attempt) + require.Nil(t, insertRes.Job.AttemptedAt) + requireEqualTime(t, targetTime, insertRes.Job.CreatedAt) + require.Equal(t, []byte(`{"encoded": "args"}`), insertRes.Job.EncodedArgs) + require.Empty(t, insertRes.Job.Errors) + require.Nil(t, insertRes.Job.FinalizedAt) + require.Equal(t, "test_kind", insertRes.Job.Kind) + require.Equal(t, 6, insertRes.Job.MaxAttempts) + require.Equal(t, []byte(`{"meta": "data"}`), insertRes.Job.Metadata) + require.Equal(t, 2, insertRes.Job.Priority) + require.Equal(t, "queue_name", insertRes.Job.Queue) + requireEqualTime(t, targetTime, insertRes.Job.ScheduledAt) + require.Equal(t, rivertype.JobStateRunning, insertRes.Job.State) + require.Equal(t, []string{"tag"}, insertRes.Job.Tags) + require.Equal(t, []byte("unique-key"), insertRes.Job.UniqueKey) + }) + + t.Run("ReturnsExistingOnConflict", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + params := &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + }, + UniqueKey: []byte("unique-key"), + } + + insertRes1, err := exec.JobInsertUnique(ctx, params) + require.NoError(t, err) + + insertRes2, err := exec.JobInsertUnique(ctx, params) + require.NoError(t, err) + require.Equal(t, insertRes1.Job.ID, insertRes2.Job.ID) + }) + }) + t.Run("JobList", func(t *testing.T) { t.Parallel() @@ -1392,7 +1503,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) - job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), UniqueKey: []byte("unique-key")}) // Running, but won't be completed. otherJob := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) @@ -1420,6 +1531,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) require.Equal(t, rivertype.JobStateCompleted, job3Updated.State) require.WithinDuration(t, finalizedAt3, *job3Updated.FinalizedAt, time.Microsecond) + require.Equal(t, "unique-key", string(job3Updated.UniqueKey)) otherJobUpdated, err := exec.JobGetByID(ctx, otherJob.ID) require.NoError(t, err) @@ -1512,7 +1624,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC() job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRunning), + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), }) jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, now)) @@ -1523,6 +1636,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, jobUpdated, err := exec.JobGetByID(ctx, job.ID) require.NoError(t, err) require.Equal(t, rivertype.JobStateCompleted, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) }) t.Run("DoesNotCompleteARetryableJob", func(t *testing.T) { @@ -1533,7 +1647,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC() job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRetryable), + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key"), }) jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, now)) @@ -1544,21 +1659,22 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, jobUpdated, err := exec.JobGetByID(ctx, job.ID) require.NoError(t, err) require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) }) }) - t.Run("JobSetStateIfRunning_JobSetStateErrored", func(t *testing.T) { - t.Parallel() + makeErrPayload := func(t *testing.T, now time.Time) []byte { + t.Helper() - makeErrPayload := func(t *testing.T, now time.Time) []byte { - t.Helper() + errPayload, err := json.Marshal(rivertype.AttemptError{ + Attempt: 1, At: now, Error: "fake error", Trace: "foo.go:123\nbar.go:456", + }) + require.NoError(t, err) + return errPayload + } - errPayload, err := json.Marshal(rivertype.AttemptError{ - Attempt: 1, At: now, Error: "fake error", Trace: "foo.go:123\nbar.go:456", - }) - require.NoError(t, err) - return errPayload - } + t.Run("JobSetStateIfRunning_JobSetStateErrored", func(t *testing.T) { + t.Parallel() t.Run("SetsARunningJobToRetryable", func(t *testing.T) { t.Parallel() @@ -1568,7 +1684,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC() job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRunning), + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), }) jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateErrorRetryable(job.ID, now, makeErrPayload(t, now))) @@ -1579,6 +1696,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, jobUpdated, err := exec.JobGetByID(ctx, job.ID) require.NoError(t, err) require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) // validate error payload: require.Len(t, jobAfter.Errors, 1) @@ -1639,6 +1757,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.WithinDuration(t, time.Now().UTC(), *jobAfter.FinalizedAt, 2*time.Second) // ScheduledAt should not be touched: require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + // Errors should still be appended to: require.Len(t, jobAfter.Errors, 1) require.Contains(t, jobAfter.Errors[0].Error, "fake error") @@ -1650,6 +1769,60 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("JobSetStateIfRunning_JobSetStateCancelled", func(t *testing.T) { //nolint:dupl + t.Parallel() + + t.Run("DiscardsARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateCancelled(job.ID, now, makeErrPayload(t, now))) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, jobUpdated.State) + require.Nil(t, jobUpdated.UniqueKey) + }) + }) + + t.Run("JobSetStateIfRunning_JobSetStateDiscarded", func(t *testing.T) { //nolint:dupl + t.Parallel() + + t.Run("DiscardsARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateDiscarded(job.ID, now, makeErrPayload(t, now))) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, jobUpdated.State) + require.Nil(t, jobUpdated.UniqueKey) + }) + }) + t.Run("JobUpdate", func(t *testing.T) { t.Parallel() diff --git a/internal/riverinternaltest/testfactory/test_factory.go b/internal/riverinternaltest/testfactory/test_factory.go index faba3a5c..be73ece1 100644 --- a/internal/riverinternaltest/testfactory/test_factory.go +++ b/internal/riverinternaltest/testfactory/test_factory.go @@ -33,6 +33,7 @@ type JobOpts struct { ScheduledAt *time.Time State *rivertype.JobState Tags []string + UniqueKey []byte } func Job(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts *JobOpts) *rivertype.JobRow { @@ -76,6 +77,7 @@ func Job_Build(tb testing.TB, opts *JobOpts) *riverdriver.JobInsertFullParams { ScheduledAt: opts.ScheduledAt, State: ptrutil.ValOrDefault(opts.State, rivertype.JobStateAvailable), Tags: tags, + UniqueKey: opts.UniqueKey, } } diff --git a/job_executor_test.go b/job_executor_test.go index 8e2e4d18..e99de1a1 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -331,6 +331,16 @@ func TestJobExecutor_Execute(t *testing.T) { // ensure we still have remaining attempts: require.Greater(t, bundle.jobRow.MaxAttempts, bundle.jobRow.Attempt) + // add a unique key so we can verify it's cleared + var err error + bundle.jobRow, err = bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + ID: bundle.jobRow.ID, + State: rivertype.JobStateAvailable, // required for encoding but ignored + UniqueKeyDoUpdate: true, + UniqueKey: []byte("unique-key"), + }) + require.NoError(t, err) + cancelErr := JobCancel(errors.New("throw away this job")) executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow) @@ -341,6 +351,7 @@ func TestJobExecutor_Execute(t *testing.T) { require.NoError(t, err) require.WithinDuration(t, time.Now(), *job.FinalizedAt, 2*time.Second) require.Equal(t, rivertype.JobStateCancelled, job.State) + require.Nil(t, job.UniqueKey) require.Len(t, job.Errors, 1) require.WithinDuration(t, time.Now(), job.Errors[0].At, 2*time.Second) require.Equal(t, 1, job.Errors[0].Attempt) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 925b48d9..51e00efd 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -118,6 +118,7 @@ type Executor interface { JobInsertFast(ctx context.Context, params *JobInsertFastParams) (*rivertype.JobRow, error) JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) (int, error) JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error) + JobInsertUnique(ctx context.Context, params *JobInsertUniqueParams) (*JobInsertUniqueResult, error) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) JobListFields() string JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error) @@ -259,6 +260,16 @@ type JobInsertFastParams struct { Tags []string } +type JobInsertUniqueParams struct { + *JobInsertFastParams + UniqueKey []byte +} + +type JobInsertUniqueResult struct { + Job *rivertype.JobRow + UniqueSkippedAsDuplicate bool +} + type JobInsertFullParams struct { Attempt int AttemptedAt *time.Time @@ -274,6 +285,7 @@ type JobInsertFullParams struct { ScheduledAt *time.Time State rivertype.JobState Tags []string + UniqueKey []byte } type JobRescueManyParams struct { @@ -353,6 +365,8 @@ type JobUpdateParams struct { FinalizedAt *time.Time StateDoUpdate bool State rivertype.JobState + UniqueKeyDoUpdate bool + UniqueKey []byte } // Leader represents a River leader. diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/models.go b/riverdriver/riverdatabasesql/internal/dbsqlc/models.go index 7e7b0976..792115b4 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/models.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/models.go @@ -75,6 +75,7 @@ type RiverJob struct { State RiverJobState ScheduledAt time.Time Tags []string + UniqueKey []byte } type RiverLeader struct { diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index a492c81f..56bf800a 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -42,17 +42,20 @@ updated_job AS ( finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE now() END, -- Mark the job as cancelled by query so that the rescuer knows not to -- rescue it, even if it gets stuck in the running state: - metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true) + metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true), + -- Similarly, zero a ` + "`" + `unique_key` + "`" + ` if the job is transitioning directly + -- to cancelled. Otherwise, it'll be clear the job executor. + unique_key = CASE WHEN state = 'running' THEN unique_key ELSE NULL END FROM notification WHERE river_job.id = notification.id - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -82,6 +85,7 @@ func (q *Queries) JobCancel(ctx context.Context, db DBTX, arg *JobCancelParams) &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -113,14 +117,14 @@ deleted_job AS ( WHERE river_job.id = job_to_delete.id -- Do not touch running jobs: AND river_job.state != 'running' - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM deleted_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM deleted_job ` @@ -144,6 +148,7 @@ func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -161,7 +166,7 @@ WITH deleted_jobs AS ( ORDER BY id LIMIT $4::bigint ) - RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ) SELECT count(*) FROM deleted_jobs @@ -189,7 +194,7 @@ func (q *Queries) JobDeleteBefore(ctx context.Context, db DBTX, arg *JobDeleteBe const jobGetAvailable = `-- name: JobGetAvailable :many WITH locked_jobs AS ( SELECT - id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE @@ -216,7 +221,7 @@ FROM WHERE river_job.id = locked_jobs.id RETURNING - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ` type JobGetAvailableParams struct { @@ -251,6 +256,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -266,7 +272,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail } const jobGetByID = `-- name: JobGetByID :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1 LIMIT 1 @@ -292,12 +298,13 @@ func (q *Queries) JobGetByID(ctx context.Context, db DBTX, id int64) (*RiverJob, &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } const jobGetByIDMany = `-- name: JobGetByIDMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = any($1::bigint[]) ORDER BY id @@ -329,6 +336,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -344,7 +352,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R } const jobGetByKindAndUniqueProperties = `-- name: JobGetByKindAndUniqueProperties :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = $1 AND CASE WHEN $2::boolean THEN args = $3 ELSE true END @@ -397,12 +405,13 @@ func (q *Queries) JobGetByKindAndUniqueProperties(ctx context.Context, db DBTX, &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } const jobGetByKindMany = `-- name: JobGetByKindMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = any($1::text[]) ORDER BY id @@ -434,6 +443,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -449,7 +459,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) } const jobGetStuck = `-- name: JobGetStuck :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE state = 'running' AND attempted_at < $1::timestamptz @@ -488,6 +498,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -527,7 +538,7 @@ INSERT INTO river_job( coalesce($9::timestamptz, now()), $10, coalesce($11::varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobInsertFastParams struct { @@ -576,6 +587,7 @@ func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFast &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -653,7 +665,8 @@ INSERT INTO river_job( queue, scheduled_at, state, - tags + tags, + unique_key ) VALUES ( $1::jsonb, coalesce($2::smallint, 0), @@ -668,8 +681,9 @@ INSERT INTO river_job( $11, coalesce($12::timestamptz, now()), $13, - coalesce($14::varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + coalesce($14::varchar(255)[], '{}'), + $15 +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobInsertFullParams struct { @@ -687,6 +701,7 @@ type JobInsertFullParams struct { ScheduledAt *time.Time State RiverJobState Tags []string + UniqueKey []byte } func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFullParams) (*RiverJob, error) { @@ -705,6 +720,7 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull arg.ScheduledAt, arg.State, pq.Array(arg.Tags), + arg.UniqueKey, ) var i RiverJob err := row.Scan( @@ -724,6 +740,116 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, + ) + return &i, err +} + +const jobInsertUnique = `-- name: JobInsertUnique :one +INSERT INTO river_job( + args, + created_at, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags, + unique_key +) VALUES ( + $1, + coalesce($2::timestamptz, now()), + $3, + $4, + $5, + coalesce($6::jsonb, '{}'), + $7, + $8, + coalesce($9::timestamptz, now()), + $10, + coalesce($11::varchar(255)[], '{}'), + $12 +) +ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, (xmax != 0) AS unique_skipped_as_duplicate +` + +type JobInsertUniqueParams struct { + Args string + CreatedAt *time.Time + FinalizedAt *time.Time + Kind string + MaxAttempts int16 + Metadata string + Priority int16 + Queue string + ScheduledAt *time.Time + State RiverJobState + Tags []string + UniqueKey []byte +} + +type JobInsertUniqueRow struct { + ID int64 + Args string + Attempt int16 + AttemptedAt *time.Time + AttemptedBy []string + CreatedAt time.Time + Errors []string + FinalizedAt *time.Time + Kind string + MaxAttempts int16 + Metadata string + Priority int16 + Queue string + State RiverJobState + ScheduledAt time.Time + Tags []string + UniqueKey []byte + UniqueSkippedAsDuplicate bool +} + +func (q *Queries) JobInsertUnique(ctx context.Context, db DBTX, arg *JobInsertUniqueParams) (*JobInsertUniqueRow, error) { + row := db.QueryRowContext(ctx, jobInsertUnique, + arg.Args, + arg.CreatedAt, + arg.FinalizedAt, + arg.Kind, + arg.MaxAttempts, + arg.Metadata, + arg.Priority, + arg.Queue, + arg.ScheduledAt, + arg.State, + pq.Array(arg.Tags), + arg.UniqueKey, + ) + var i JobInsertUniqueRow + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + &i.UniqueSkippedAsDuplicate, ) return &i, err } @@ -786,14 +912,14 @@ updated_job AS ( AND river_job.state != 'running' -- If the job is already available with a prior scheduled_at, leave it alone. AND NOT (river_job.state = 'available' AND river_job.scheduled_at < now()) - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -817,6 +943,7 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -844,7 +971,7 @@ river_job_scheduled AS ( WHERE river_job.id = jobs_to_schedule.id RETURNING river_job.id ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id IN (SELECT id FROM river_job_scheduled) ` @@ -880,6 +1007,7 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobSchedulePara &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -914,13 +1042,13 @@ updated_job AS ( state = 'completed' FROM job_to_update WHERE river_job.id = job_to_update.id - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id IN (SELECT id FROM job_to_finalized_at EXCEPT SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -955,6 +1083,7 @@ func (q *Queries) JobSetCompleteIfRunningMany(ctx context.Context, db DBTX, arg &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -991,18 +1120,20 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END + ELSE scheduled_at END, + unique_key = CASE WHEN $1 IN ('cancelled', 'discarded') THEN NULL + ELSE unique_key END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $2::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -1050,6 +1181,7 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -1061,9 +1193,10 @@ SET attempted_at = CASE WHEN $3::boolean THEN $4 ELSE attempted_at END, errors = CASE WHEN $5::boolean THEN $6::jsonb[] ELSE errors END, finalized_at = CASE WHEN $7::boolean THEN $8 ELSE finalized_at END, - state = CASE WHEN $9::boolean THEN $10 ELSE state END -WHERE id = $11 -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + state = CASE WHEN $9::boolean THEN $10 ELSE state END, + unique_key = CASE WHEN $11::boolean THEN $12 ELSE unique_key END +WHERE id = $13 +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobUpdateParams struct { @@ -1077,6 +1210,8 @@ type JobUpdateParams struct { FinalizedAt *time.Time StateDoUpdate bool State RiverJobState + UniqueKeyDoUpdate bool + UniqueKey []byte ID int64 } @@ -1094,6 +1229,8 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) arg.FinalizedAt, arg.StateDoUpdate, arg.State, + arg.UniqueKeyDoUpdate, + arg.UniqueKey, arg.ID, ) var i RiverJob @@ -1114,6 +1251,7 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } diff --git a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.up.sql b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.up.sql deleted file mode 100644 index f01e5f92..00000000 --- a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.up.sql +++ /dev/null @@ -1,18 +0,0 @@ -ALTER TABLE river_migration - RENAME TO river_migration_old; - -CREATE TABLE river_migration( - line TEXT NOT NULL, - version bigint NOT NULL, - created_at timestamptz NOT NULL DEFAULT NOW(), - CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), - CONSTRAINT version_gte_1 CHECK (version >= 1), - PRIMARY KEY (line, version) -); - -INSERT INTO river_migration - (created_at, line, version) -SELECT created_at, 'main', version -FROM river_migration_old; - -DROP TABLE river_migration_old; \ No newline at end of file diff --git a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.down.sql b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql similarity index 74% rename from riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.down.sql rename to riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql index ffdf1240..f2136ede 100644 --- a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.down.sql +++ b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql @@ -1,5 +1,7 @@ -- --- If any non-main migration are present, 005 is considered irreversible. +-- Revert to migration table based only on `(version)`. +-- +-- If any non-main migrations are present, 005 is considered irreversible. -- DO @@ -33,4 +35,11 @@ INSERT INTO river_migration SELECT created_at, version FROM river_migration_old; -DROP TABLE river_migration_old; \ No newline at end of file +DROP TABLE river_migration_old; + +-- +-- Drop `river_job.unique_key`. +-- + +ALTER TABLE river_job + DROP COLUMN unique_key; diff --git a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql new file mode 100644 index 00000000..155ce30d --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql @@ -0,0 +1,34 @@ +-- +-- Rebuild the migration table so it's based on `(line, version)`. +-- + +ALTER TABLE river_migration + RENAME TO river_migration_old; + +CREATE TABLE river_migration( + line TEXT NOT NULL, + version bigint NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), + PRIMARY KEY (line, version) +); + +INSERT INTO river_migration + (created_at, line, version) +SELECT created_at, 'main', version +FROM river_migration_old; + +DROP TABLE river_migration_old; + +-- +-- Add `river_job.unique_key` and bring up an index on it. +-- + +-- These statements use `IF NOT EXISTS` to allow users with a `river_job` table +-- of non-trivial size to build the index `CONCURRENTLY` out of band of this +-- migration, then follow by completing the migration. +ALTER TABLE river_job + ADD COLUMN IF NOT EXISTS unique_key bytea; + +CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; diff --git a/riverdriver/riverdatabasesql/river_database_sql.go b/riverdriver/riverdatabasesql/river_database_sql.go index b4bb9abe..91e09e55 100644 --- a/riverdriver/riverdatabasesql/river_database_sql.go +++ b/riverdriver/riverdatabasesql/river_database_sql.go @@ -276,6 +276,7 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns ScheduledAt: params.ScheduledAt, State: dbsqlc.RiverJobState(params.State), Tags: params.Tags, + UniqueKey: params.UniqueKey, }) if err != nil { return nil, interpretError(err) @@ -283,6 +284,53 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } +func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { + insertRes, err := dbsqlc.New().JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ + Args: string(params.EncodedArgs), + CreatedAt: params.CreatedAt, + Kind: params.Kind, + MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), + Metadata: valutil.ValOrDefault(string(params.Metadata), "{}"), + Priority: int16(min(params.Priority, math.MaxInt16)), + Queue: params.Queue, + ScheduledAt: params.ScheduledAt, + State: dbsqlc.RiverJobState(params.State), + Tags: params.Tags, + UniqueKey: params.UniqueKey, + }) + if err != nil { + return nil, interpretError(err) + } + + jobRow, err := jobRowFromInternal(&dbsqlc.RiverJob{ + ID: insertRes.ID, + Args: insertRes.Args, + Attempt: insertRes.Attempt, + AttemptedAt: insertRes.AttemptedAt, + AttemptedBy: insertRes.AttemptedBy, + CreatedAt: insertRes.CreatedAt, + Errors: insertRes.Errors, + FinalizedAt: insertRes.FinalizedAt, + Kind: insertRes.Kind, + MaxAttempts: insertRes.MaxAttempts, + Metadata: insertRes.Metadata, + Priority: insertRes.Priority, + Queue: insertRes.Queue, + ScheduledAt: insertRes.ScheduledAt, + State: insertRes.State, + Tags: insertRes.Tags, + UniqueKey: insertRes.UniqueKey, + }) + if err != nil { + return nil, err + } + + return &riverdriver.JobInsertUniqueResult{ + Job: jobRow, + UniqueSkippedAsDuplicate: insertRes.UniqueSkippedAsDuplicate, + }, nil +} + func (e *Executor) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) { // `database/sql` has an `sql.Named` system that should theoretically work // for named parameters, but neither Pgx or lib/pq implement it, so just use @@ -419,6 +467,8 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP FinalizedAt: params.FinalizedAt, StateDoUpdate: params.StateDoUpdate, State: dbsqlc.RiverJobState(params.State), + UniqueKeyDoUpdate: params.UniqueKeyDoUpdate, + UniqueKey: params.UniqueKey, }) if err != nil { return nil, interpretError(err) @@ -800,6 +850,7 @@ func jobRowFromInternal(internal *dbsqlc.RiverJob) (*rivertype.JobRow, error) { ScheduledAt: internal.ScheduledAt.UTC(), State: rivertype.JobState(internal.State), Tags: internal.Tags, + UniqueKey: internal.UniqueKey, }, nil } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/models.go b/riverdriver/riverpgxv5/internal/dbsqlc/models.go index 0e852e34..5cfbe281 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/models.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/models.go @@ -75,6 +75,7 @@ type RiverJob struct { State RiverJobState ScheduledAt time.Time Tags []string + UniqueKey []byte } type RiverLeader struct { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index ba4d8012..2fb28757 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -26,6 +26,7 @@ CREATE TABLE river_job( state river_job_state NOT NULL DEFAULT 'available', scheduled_at timestamptz NOT NULL DEFAULT NOW(), tags varchar(255)[] NOT NULL DEFAULT '{}', + unique_key bytea, CONSTRAINT finalized_or_finalized_at_null CHECK ( (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) @@ -65,7 +66,10 @@ updated_job AS ( finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE now() END, -- Mark the job as cancelled by query so that the rescuer knows not to -- rescue it, even if it gets stuck in the running state: - metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], @cancel_attempted_at::jsonb, true) + metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], @cancel_attempted_at::jsonb, true), + -- Similarly, zero a `unique_key` if the job is transitioning directly + -- to cancelled. Otherwise, it'll be clear the job executor. + unique_key = CASE WHEN state = 'running' THEN unique_key ELSE NULL END FROM notification WHERE river_job.id = notification.id RETURNING river_job.* @@ -261,7 +265,8 @@ INSERT INTO river_job( queue, scheduled_at, state, - tags + tags, + unique_key ) VALUES ( @args::jsonb, coalesce(@attempt::smallint, 0), @@ -276,9 +281,43 @@ INSERT INTO river_job( @queue, coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), @state, - coalesce(@tags::varchar(255)[], '{}') + coalesce(@tags::varchar(255)[], '{}'), + @unique_key ) RETURNING *; +-- name: JobInsertUnique :one +INSERT INTO river_job( + args, + created_at, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags, + unique_key +) VALUES ( + @args, + coalesce(sqlc.narg('created_at')::timestamptz, now()), + @finalized_at, + @kind, + @max_attempts, + coalesce(@metadata::jsonb, '{}'), + @priority, + @queue, + coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), + @state, + coalesce(@tags::varchar(255)[], '{}'), + @unique_key +) +ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING *, (xmax != 0) AS unique_skipped_as_duplicate; + -- Run by the rescuer to queue for retry or discard depending on job state. -- name: JobRescueMany :exec UPDATE river_job @@ -405,7 +444,9 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND @max_attempts_update::boolean THEN @max_attempts ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND @scheduled_at_do_update::boolean THEN sqlc.narg('scheduled_at')::timestamptz - ELSE scheduled_at END + ELSE scheduled_at END, + unique_key = CASE WHEN @state IN ('cancelled', 'discarded') THEN NULL + ELSE unique_key END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' @@ -428,6 +469,7 @@ SET attempted_at = CASE WHEN @attempted_at_do_update::boolean THEN @attempted_at ELSE attempted_at END, errors = CASE WHEN @errors_do_update::boolean THEN @errors::jsonb[] ELSE errors END, finalized_at = CASE WHEN @finalized_at_do_update::boolean THEN @finalized_at ELSE finalized_at END, - state = CASE WHEN @state_do_update::boolean THEN @state ELSE state END + state = CASE WHEN @state_do_update::boolean THEN @state ELSE state END, + unique_key = CASE WHEN @unique_key_do_update::boolean THEN @unique_key ELSE unique_key END WHERE id = @id RETURNING *; diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 3e2b14ad..0e957f14 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -40,17 +40,20 @@ updated_job AS ( finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE now() END, -- Mark the job as cancelled by query so that the rescuer knows not to -- rescue it, even if it gets stuck in the running state: - metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true) + metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true), + -- Similarly, zero a ` + "`" + `unique_key` + "`" + ` if the job is transitioning directly + -- to cancelled. Otherwise, it'll be clear the job executor. + unique_key = CASE WHEN state = 'running' THEN unique_key ELSE NULL END FROM notification WHERE river_job.id = notification.id - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -80,6 +83,7 @@ func (q *Queries) JobCancel(ctx context.Context, db DBTX, arg *JobCancelParams) &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -111,14 +115,14 @@ deleted_job AS ( WHERE river_job.id = job_to_delete.id -- Do not touch running jobs: AND river_job.state != 'running' - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM deleted_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM deleted_job ` @@ -142,6 +146,7 @@ func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -159,7 +164,7 @@ WITH deleted_jobs AS ( ORDER BY id LIMIT $4::bigint ) - RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ) SELECT count(*) FROM deleted_jobs @@ -187,7 +192,7 @@ func (q *Queries) JobDeleteBefore(ctx context.Context, db DBTX, arg *JobDeleteBe const jobGetAvailable = `-- name: JobGetAvailable :many WITH locked_jobs AS ( SELECT - id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE @@ -214,7 +219,7 @@ FROM WHERE river_job.id = locked_jobs.id RETURNING - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ` type JobGetAvailableParams struct { @@ -249,6 +254,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -261,7 +267,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail } const jobGetByID = `-- name: JobGetByID :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1 LIMIT 1 @@ -287,12 +293,13 @@ func (q *Queries) JobGetByID(ctx context.Context, db DBTX, id int64) (*RiverJob, &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } const jobGetByIDMany = `-- name: JobGetByIDMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = any($1::bigint[]) ORDER BY id @@ -324,6 +331,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -336,7 +344,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R } const jobGetByKindAndUniqueProperties = `-- name: JobGetByKindAndUniqueProperties :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = $1 AND CASE WHEN $2::boolean THEN args = $3 ELSE true END @@ -389,12 +397,13 @@ func (q *Queries) JobGetByKindAndUniqueProperties(ctx context.Context, db DBTX, &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } const jobGetByKindMany = `-- name: JobGetByKindMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = any($1::text[]) ORDER BY id @@ -426,6 +435,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -438,7 +448,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) } const jobGetStuck = `-- name: JobGetStuck :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE state = 'running' AND attempted_at < $1::timestamptz @@ -477,6 +487,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -513,7 +524,7 @@ INSERT INTO river_job( coalesce($9::timestamptz, now()), $10, coalesce($11::varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobInsertFastParams struct { @@ -562,6 +573,7 @@ func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFast &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -639,7 +651,8 @@ INSERT INTO river_job( queue, scheduled_at, state, - tags + tags, + unique_key ) VALUES ( $1::jsonb, coalesce($2::smallint, 0), @@ -654,8 +667,9 @@ INSERT INTO river_job( $11, coalesce($12::timestamptz, now()), $13, - coalesce($14::varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + coalesce($14::varchar(255)[], '{}'), + $15 +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobInsertFullParams struct { @@ -673,6 +687,7 @@ type JobInsertFullParams struct { ScheduledAt *time.Time State RiverJobState Tags []string + UniqueKey []byte } func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFullParams) (*RiverJob, error) { @@ -691,6 +706,7 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull arg.ScheduledAt, arg.State, arg.Tags, + arg.UniqueKey, ) var i RiverJob err := row.Scan( @@ -710,6 +726,116 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, + ) + return &i, err +} + +const jobInsertUnique = `-- name: JobInsertUnique :one +INSERT INTO river_job( + args, + created_at, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags, + unique_key +) VALUES ( + $1, + coalesce($2::timestamptz, now()), + $3, + $4, + $5, + coalesce($6::jsonb, '{}'), + $7, + $8, + coalesce($9::timestamptz, now()), + $10, + coalesce($11::varchar(255)[], '{}'), + $12 +) +ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, (xmax != 0) AS unique_skipped_as_duplicate +` + +type JobInsertUniqueParams struct { + Args []byte + CreatedAt *time.Time + FinalizedAt *time.Time + Kind string + MaxAttempts int16 + Metadata []byte + Priority int16 + Queue string + ScheduledAt *time.Time + State RiverJobState + Tags []string + UniqueKey []byte +} + +type JobInsertUniqueRow struct { + ID int64 + Args []byte + Attempt int16 + AttemptedAt *time.Time + AttemptedBy []string + CreatedAt time.Time + Errors [][]byte + FinalizedAt *time.Time + Kind string + MaxAttempts int16 + Metadata []byte + Priority int16 + Queue string + State RiverJobState + ScheduledAt time.Time + Tags []string + UniqueKey []byte + UniqueSkippedAsDuplicate bool +} + +func (q *Queries) JobInsertUnique(ctx context.Context, db DBTX, arg *JobInsertUniqueParams) (*JobInsertUniqueRow, error) { + row := db.QueryRow(ctx, jobInsertUnique, + arg.Args, + arg.CreatedAt, + arg.FinalizedAt, + arg.Kind, + arg.MaxAttempts, + arg.Metadata, + arg.Priority, + arg.Queue, + arg.ScheduledAt, + arg.State, + arg.Tags, + arg.UniqueKey, + ) + var i JobInsertUniqueRow + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueSkippedAsDuplicate, ) return &i, err } @@ -772,14 +898,14 @@ updated_job AS ( AND river_job.state != 'running' -- If the job is already available with a prior scheduled_at, leave it alone. AND NOT (river_job.state = 'available' AND river_job.scheduled_at < now()) - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -803,6 +929,7 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -830,7 +957,7 @@ river_job_scheduled AS ( WHERE river_job.id = jobs_to_schedule.id RETURNING river_job.id ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id IN (SELECT id FROM river_job_scheduled) ` @@ -866,6 +993,7 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobSchedulePara &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -897,13 +1025,13 @@ updated_job AS ( state = 'completed' FROM job_to_update WHERE river_job.id = job_to_update.id - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id IN (SELECT id FROM job_to_finalized_at EXCEPT SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -938,6 +1066,7 @@ func (q *Queries) JobSetCompleteIfRunningMany(ctx context.Context, db DBTX, arg &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -971,18 +1100,20 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END + ELSE scheduled_at END, + unique_key = CASE WHEN $1 IN ('cancelled', 'discarded') THEN NULL + ELSE unique_key END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $2::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -1030,6 +1161,7 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -1041,9 +1173,10 @@ SET attempted_at = CASE WHEN $3::boolean THEN $4 ELSE attempted_at END, errors = CASE WHEN $5::boolean THEN $6::jsonb[] ELSE errors END, finalized_at = CASE WHEN $7::boolean THEN $8 ELSE finalized_at END, - state = CASE WHEN $9::boolean THEN $10 ELSE state END -WHERE id = $11 -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + state = CASE WHEN $9::boolean THEN $10 ELSE state END, + unique_key = CASE WHEN $11::boolean THEN $12 ELSE unique_key END +WHERE id = $13 +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobUpdateParams struct { @@ -1057,6 +1190,8 @@ type JobUpdateParams struct { FinalizedAt *time.Time StateDoUpdate bool State RiverJobState + UniqueKeyDoUpdate bool + UniqueKey []byte ID int64 } @@ -1074,6 +1209,8 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) arg.FinalizedAt, arg.StateDoUpdate, arg.State, + arg.UniqueKeyDoUpdate, + arg.UniqueKey, arg.ID, ) var i RiverJob @@ -1094,6 +1231,7 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } diff --git a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.up.sql b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.up.sql deleted file mode 100644 index f01e5f92..00000000 --- a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.up.sql +++ /dev/null @@ -1,18 +0,0 @@ -ALTER TABLE river_migration - RENAME TO river_migration_old; - -CREATE TABLE river_migration( - line TEXT NOT NULL, - version bigint NOT NULL, - created_at timestamptz NOT NULL DEFAULT NOW(), - CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), - CONSTRAINT version_gte_1 CHECK (version >= 1), - PRIMARY KEY (line, version) -); - -INSERT INTO river_migration - (created_at, line, version) -SELECT created_at, 'main', version -FROM river_migration_old; - -DROP TABLE river_migration_old; \ No newline at end of file diff --git a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.down.sql b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql similarity index 74% rename from riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.down.sql rename to riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql index ffdf1240..f2136ede 100644 --- a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.down.sql +++ b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql @@ -1,5 +1,7 @@ -- --- If any non-main migration are present, 005 is considered irreversible. +-- Revert to migration table based only on `(version)`. +-- +-- If any non-main migrations are present, 005 is considered irreversible. -- DO @@ -33,4 +35,11 @@ INSERT INTO river_migration SELECT created_at, version FROM river_migration_old; -DROP TABLE river_migration_old; \ No newline at end of file +DROP TABLE river_migration_old; + +-- +-- Drop `river_job.unique_key`. +-- + +ALTER TABLE river_job + DROP COLUMN unique_key; diff --git a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql new file mode 100644 index 00000000..155ce30d --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql @@ -0,0 +1,34 @@ +-- +-- Rebuild the migration table so it's based on `(line, version)`. +-- + +ALTER TABLE river_migration + RENAME TO river_migration_old; + +CREATE TABLE river_migration( + line TEXT NOT NULL, + version bigint NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), + PRIMARY KEY (line, version) +); + +INSERT INTO river_migration + (created_at, line, version) +SELECT created_at, 'main', version +FROM river_migration_old; + +DROP TABLE river_migration_old; + +-- +-- Add `river_job.unique_key` and bring up an index on it. +-- + +-- These statements use `IF NOT EXISTS` to allow users with a `river_job` table +-- of non-trivial size to build the index `CONCURRENTLY` out of band of this +-- migration, then follow by completing the migration. +ALTER TABLE river_job + ADD COLUMN IF NOT EXISTS unique_key bytea; + +CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index e07ee997..8e44e85a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -270,6 +270,7 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns ScheduledAt: params.ScheduledAt, State: dbsqlc.RiverJobState(params.State), Tags: params.Tags, + UniqueKey: params.UniqueKey, }) if err != nil { return nil, interpretError(err) @@ -277,6 +278,53 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } +func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { + insertRes, err := dbsqlc.New().JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ + Args: params.EncodedArgs, + CreatedAt: params.CreatedAt, + Kind: params.Kind, + MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), + Metadata: params.Metadata, + Priority: int16(min(params.Priority, math.MaxInt16)), + Queue: params.Queue, + ScheduledAt: params.ScheduledAt, + State: dbsqlc.RiverJobState(params.State), + Tags: params.Tags, + UniqueKey: params.UniqueKey, + }) + if err != nil { + return nil, interpretError(err) + } + + jobRow, err := jobRowFromInternal(&dbsqlc.RiverJob{ + ID: insertRes.ID, + Args: insertRes.Args, + Attempt: insertRes.Attempt, + AttemptedAt: insertRes.AttemptedAt, + AttemptedBy: insertRes.AttemptedBy, + CreatedAt: insertRes.CreatedAt, + Errors: insertRes.Errors, + FinalizedAt: insertRes.FinalizedAt, + Kind: insertRes.Kind, + MaxAttempts: insertRes.MaxAttempts, + Metadata: insertRes.Metadata, + Priority: insertRes.Priority, + Queue: insertRes.Queue, + ScheduledAt: insertRes.ScheduledAt, + State: insertRes.State, + Tags: insertRes.Tags, + UniqueKey: insertRes.UniqueKey, + }) + if err != nil { + return nil, err + } + + return &riverdriver.JobInsertUniqueResult{ + Job: jobRow, + UniqueSkippedAsDuplicate: insertRes.UniqueSkippedAsDuplicate, + }, nil +} + func (e *Executor) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) { rows, err := e.dbtx.Query(ctx, query, pgx.NamedArgs(namedArgs)) if err != nil { @@ -395,6 +443,8 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP FinalizedAt: params.FinalizedAt, StateDoUpdate: params.StateDoUpdate, State: dbsqlc.RiverJobState(params.State), + UniqueKeyDoUpdate: params.UniqueKeyDoUpdate, + UniqueKey: params.UniqueKey, }) if err != nil { return nil, interpretError(err) @@ -778,6 +828,7 @@ func jobRowFromInternal(internal *dbsqlc.RiverJob) (*rivertype.JobRow, error) { ScheduledAt: internal.ScheduledAt.UTC(), State: rivertype.JobState(internal.State), Tags: internal.Tags, + UniqueKey: internal.UniqueKey, }, nil } diff --git a/rivertype/river_type.go b/rivertype/river_type.go index df4f6038..0f8f309c 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -113,6 +113,11 @@ type JobRow struct { // functional behavior and are meant entirely as a user-specified construct // to help group and categorize jobs. Tags []string + + // UniqueKey is a unique key for the job within its kind that's used for + // unique job insertions. It's generated by hashing an inserted job's unique + // opts configuration. + UniqueKey []byte } // JobState is the state of a job. Jobs start their lifecycle as either