From 56b54876805bc01c5f10856433bb6acf85638c5a Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 19 Jul 2024 17:14:37 -0700 Subject: [PATCH] Use `sqlc.embed` to embed a `RiverJob` row during unique insertion Follows up #451 with a significant code quality improvement in which we can use `sqlc.embed` (which TIL about) to embed a `RiverJob` row directly on the returned struct, which means we can use our normal `jobRowFromInternal` to map it to a driver result instead of having to manually construct `RiverJob` property by property. Tipped off to the existence of `sqlc.embed` by @tadejsv [1]. Thank you! [1] https://github.com/riverqueue/river/pull/451#discussion_r1684925329 --- .../internal/dbsqlc/river_job.sql.go | 54 +++++++------------ .../riverdatabasesql/river_database_sql.go | 20 +------ .../riverpgxv5/internal/dbsqlc/river_job.sql | 2 +- .../internal/dbsqlc/river_job.sql.go | 54 +++++++------------ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 20 +------ 5 files changed, 41 insertions(+), 109 deletions(-) diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 56bf800a..093d6a9d 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -776,7 +776,7 @@ INSERT INTO river_job( ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL -- Something needs to be updated for a row to be returned on a conflict. DO UPDATE SET kind = EXCLUDED.kind -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, (xmax != 0) AS unique_skipped_as_duplicate +RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, (xmax != 0) AS unique_skipped_as_duplicate ` type JobInsertUniqueParams struct { @@ -795,23 +795,7 @@ type JobInsertUniqueParams struct { } type JobInsertUniqueRow struct { - ID int64 - Args string - Attempt int16 - AttemptedAt *time.Time - AttemptedBy []string - CreatedAt time.Time - Errors []string - FinalizedAt *time.Time - Kind string - MaxAttempts int16 - Metadata string - Priority int16 - Queue string - State RiverJobState - ScheduledAt time.Time - Tags []string - UniqueKey []byte + RiverJob RiverJob UniqueSkippedAsDuplicate bool } @@ -832,23 +816,23 @@ func (q *Queries) JobInsertUnique(ctx context.Context, db DBTX, arg *JobInsertUn ) var i JobInsertUniqueRow err := row.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - pq.Array(&i.AttemptedBy), - &i.CreatedAt, - pq.Array(&i.Errors), - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - pq.Array(&i.Tags), - &i.UniqueKey, + &i.RiverJob.ID, + &i.RiverJob.Args, + &i.RiverJob.Attempt, + &i.RiverJob.AttemptedAt, + pq.Array(&i.RiverJob.AttemptedBy), + &i.RiverJob.CreatedAt, + pq.Array(&i.RiverJob.Errors), + &i.RiverJob.FinalizedAt, + &i.RiverJob.Kind, + &i.RiverJob.MaxAttempts, + &i.RiverJob.Metadata, + &i.RiverJob.Priority, + &i.RiverJob.Queue, + &i.RiverJob.State, + &i.RiverJob.ScheduledAt, + pq.Array(&i.RiverJob.Tags), + &i.RiverJob.UniqueKey, &i.UniqueSkippedAsDuplicate, ) return &i, err diff --git a/riverdriver/riverdatabasesql/river_database_sql.go b/riverdriver/riverdatabasesql/river_database_sql.go index 91e09e55..4d3e1a8b 100644 --- a/riverdriver/riverdatabasesql/river_database_sql.go +++ b/riverdriver/riverdatabasesql/river_database_sql.go @@ -302,25 +302,7 @@ func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobI return nil, interpretError(err) } - jobRow, err := jobRowFromInternal(&dbsqlc.RiverJob{ - ID: insertRes.ID, - Args: insertRes.Args, - Attempt: insertRes.Attempt, - AttemptedAt: insertRes.AttemptedAt, - AttemptedBy: insertRes.AttemptedBy, - CreatedAt: insertRes.CreatedAt, - Errors: insertRes.Errors, - FinalizedAt: insertRes.FinalizedAt, - Kind: insertRes.Kind, - MaxAttempts: insertRes.MaxAttempts, - Metadata: insertRes.Metadata, - Priority: insertRes.Priority, - Queue: insertRes.Queue, - ScheduledAt: insertRes.ScheduledAt, - State: insertRes.State, - Tags: insertRes.Tags, - UniqueKey: insertRes.UniqueKey, - }) + jobRow, err := jobRowFromInternal(&insertRes.RiverJob) if err != nil { return nil, err } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 2fb28757..373bede8 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -316,7 +316,7 @@ INSERT INTO river_job( ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL -- Something needs to be updated for a row to be returned on a conflict. DO UPDATE SET kind = EXCLUDED.kind -RETURNING *, (xmax != 0) AS unique_skipped_as_duplicate; +RETURNING sqlc.embed(river_job), (xmax != 0) AS unique_skipped_as_duplicate; -- Run by the rescuer to queue for retry or discard depending on job state. -- name: JobRescueMany :exec diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 0e957f14..c3cd2ea2 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -762,7 +762,7 @@ INSERT INTO river_job( ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL -- Something needs to be updated for a row to be returned on a conflict. DO UPDATE SET kind = EXCLUDED.kind -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, (xmax != 0) AS unique_skipped_as_duplicate +RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, (xmax != 0) AS unique_skipped_as_duplicate ` type JobInsertUniqueParams struct { @@ -781,23 +781,7 @@ type JobInsertUniqueParams struct { } type JobInsertUniqueRow struct { - ID int64 - Args []byte - Attempt int16 - AttemptedAt *time.Time - AttemptedBy []string - CreatedAt time.Time - Errors [][]byte - FinalizedAt *time.Time - Kind string - MaxAttempts int16 - Metadata []byte - Priority int16 - Queue string - State RiverJobState - ScheduledAt time.Time - Tags []string - UniqueKey []byte + RiverJob RiverJob UniqueSkippedAsDuplicate bool } @@ -818,23 +802,23 @@ func (q *Queries) JobInsertUnique(ctx context.Context, db DBTX, arg *JobInsertUn ) var i JobInsertUniqueRow err := row.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, - &i.UniqueKey, + &i.RiverJob.ID, + &i.RiverJob.Args, + &i.RiverJob.Attempt, + &i.RiverJob.AttemptedAt, + &i.RiverJob.AttemptedBy, + &i.RiverJob.CreatedAt, + &i.RiverJob.Errors, + &i.RiverJob.FinalizedAt, + &i.RiverJob.Kind, + &i.RiverJob.MaxAttempts, + &i.RiverJob.Metadata, + &i.RiverJob.Priority, + &i.RiverJob.Queue, + &i.RiverJob.State, + &i.RiverJob.ScheduledAt, + &i.RiverJob.Tags, + &i.RiverJob.UniqueKey, &i.UniqueSkippedAsDuplicate, ) return &i, err diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 8e44e85a..508f8c79 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -296,25 +296,7 @@ func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobI return nil, interpretError(err) } - jobRow, err := jobRowFromInternal(&dbsqlc.RiverJob{ - ID: insertRes.ID, - Args: insertRes.Args, - Attempt: insertRes.Attempt, - AttemptedAt: insertRes.AttemptedAt, - AttemptedBy: insertRes.AttemptedBy, - CreatedAt: insertRes.CreatedAt, - Errors: insertRes.Errors, - FinalizedAt: insertRes.FinalizedAt, - Kind: insertRes.Kind, - MaxAttempts: insertRes.MaxAttempts, - Metadata: insertRes.Metadata, - Priority: insertRes.Priority, - Queue: insertRes.Queue, - ScheduledAt: insertRes.ScheduledAt, - State: insertRes.State, - Tags: insertRes.Tags, - UniqueKey: insertRes.UniqueKey, - }) + jobRow, err := jobRowFromInternal(&insertRes.RiverJob) if err != nil { return nil, err }