From 4defbdc658756fc5dfce1df85fae911d4f8048a1 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 30 Dec 2023 20:14:19 -0600 Subject: [PATCH] pull most notification topic names from code Rather than hardcoding these into the query, pulling them from code reduces the number of distinct references to these values that must be kept in sync. --- internal/dbadapter/db_adapter.go | 6 ++++-- internal/dbsqlc/river_job.sql | 2 +- internal/dbsqlc/river_job.sql.go | 13 +++++++------ internal/dbsqlc/river_leader.sql | 2 +- internal/dbsqlc/river_leader.sql.go | 9 +++++---- internal/maintenance/scheduler.go | 6 ++++-- 6 files changed, 22 insertions(+), 16 deletions(-) diff --git a/internal/dbadapter/db_adapter.go b/internal/dbadapter/db_adapter.go index e132f561..dd299eab 100644 --- a/internal/dbadapter/db_adapter.go +++ b/internal/dbadapter/db_adapter.go @@ -13,6 +13,7 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/dbsqlc" + "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/util/hashutil" "github.com/riverqueue/river/internal/util/ptrutil" @@ -446,7 +447,8 @@ func (a *StandardAdapter) LeadershipResign(ctx context.Context, name, leaderID s defer cancel() return a.queries.LeadershipResign(ctx, a.executor, dbsqlc.LeadershipResignParams{ - LeaderID: leaderID, - Name: name, + LeaderID: leaderID, + LeadershipTopic: string(notifier.NotificationTopicLeadership), + Name: name, }) } diff --git a/internal/dbsqlc/river_job.sql b/internal/dbsqlc/river_job.sql index 47f0cc6a..92f82fc7 100644 --- a/internal/dbsqlc/river_job.sql +++ b/internal/dbsqlc/river_job.sql @@ -225,7 +225,7 @@ river_job_scheduled AS ( ) SELECT count(*) FROM ( -SELECT pg_notify('river_insert', json_build_object('queue', queue)::text) +SELECT pg_notify(@insert_topic, json_build_object('queue', queue)::text) FROM river_job_scheduled) AS notifications_sent; -- name: JobSetState :one diff --git a/internal/dbsqlc/river_job.sql.go b/internal/dbsqlc/river_job.sql.go index 756551ba..9a8f12fd 100644 --- a/internal/dbsqlc/river_job.sql.go +++ b/internal/dbsqlc/river_job.sql.go @@ -573,12 +573,12 @@ WITH jobs_to_schedule AS ( state IN ('scheduled', 'retryable') AND queue IS NOT NULL AND priority >= 0 - AND scheduled_at <= $1::timestamptz + AND scheduled_at <= $2::timestamptz ORDER BY priority, scheduled_at, id - LIMIT $2::bigint + LIMIT $3::bigint FOR UPDATE ), river_job_scheduled AS ( @@ -590,17 +590,18 @@ river_job_scheduled AS ( ) SELECT count(*) FROM ( -SELECT pg_notify('river_insert', json_build_object('queue', queue)::text) +SELECT pg_notify($1, json_build_object('queue', queue)::text) FROM river_job_scheduled) AS notifications_sent ` type JobScheduleParams struct { - Now time.Time - Max int64 + InsertTopic string + Now time.Time + Max int64 } func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg JobScheduleParams) (int64, error) { - row := db.QueryRow(ctx, jobSchedule, arg.Now, arg.Max) + row := db.QueryRow(ctx, jobSchedule, arg.InsertTopic, arg.Now, arg.Max) var count int64 err := row.Scan(&count) return count, err diff --git a/internal/dbsqlc/river_leader.sql b/internal/dbsqlc/river_leader.sql index 25e0f0d0..251aab1e 100644 --- a/internal/dbsqlc/river_leader.sql +++ b/internal/dbsqlc/river_leader.sql @@ -40,7 +40,7 @@ WITH currently_held_leaders AS ( ), notified_resignations AS ( SELECT - pg_notify('river_leadership', json_build_object('name', name, 'leader_id', leader_id, 'action', 'resigned')::text), + pg_notify(@leadership_topic, json_build_object('name', name, 'leader_id', leader_id, 'action', 'resigned')::text), currently_held_leaders.name FROM currently_held_leaders) diff --git a/internal/dbsqlc/river_leader.sql.go b/internal/dbsqlc/river_leader.sql.go index fb82f0a7..ddbb9521 100644 --- a/internal/dbsqlc/river_leader.sql.go +++ b/internal/dbsqlc/river_leader.sql.go @@ -80,7 +80,7 @@ WITH currently_held_leaders AS ( ), notified_resignations AS ( SELECT - pg_notify('river_leadership', json_build_object('name', name, 'leader_id', leader_id, 'action', 'resigned')::text), + pg_notify($3, json_build_object('name', name, 'leader_id', leader_id, 'action', 'resigned')::text), currently_held_leaders.name FROM currently_held_leaders) @@ -89,11 +89,12 @@ WHERE river_leader.name = notified_resignations.name ` type LeadershipResignParams struct { - Name string - LeaderID string + Name string + LeaderID string + LeadershipTopic string } func (q *Queries) LeadershipResign(ctx context.Context, db DBTX, arg LeadershipResignParams) error { - _, err := db.Exec(ctx, leadershipResign, arg.Name, arg.LeaderID) + _, err := db.Exec(ctx, leadershipResign, arg.Name, arg.LeaderID, arg.LeadershipTopic) return err } diff --git a/internal/maintenance/scheduler.go b/internal/maintenance/scheduler.go index 7fc583d6..82e01892 100644 --- a/internal/maintenance/scheduler.go +++ b/internal/maintenance/scheduler.go @@ -10,6 +10,7 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/maintenance/startstop" + "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/util/timeutil" @@ -139,8 +140,9 @@ func (s *Scheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, error defer cancelFunc() numScheduled, err := s.queries.JobSchedule(ctx, s.dbExecutor, dbsqlc.JobScheduleParams{ - Max: int64(s.config.Limit), - Now: s.TimeNowUTC(), + InsertTopic: string(notifier.NotificationTopicInsert), + Max: int64(s.config.Limit), + Now: s.TimeNowUTC(), }) if err != nil { return 0, fmt.Errorf("error deleting completed jobs: %w", err)