diff --git a/internal/dbadapter/db_adapter.go b/internal/dbadapter/db_adapter.go index e132f561..dd299eab 100644 --- a/internal/dbadapter/db_adapter.go +++ b/internal/dbadapter/db_adapter.go @@ -13,6 +13,7 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/dbsqlc" + "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/util/hashutil" "github.com/riverqueue/river/internal/util/ptrutil" @@ -446,7 +447,8 @@ func (a *StandardAdapter) LeadershipResign(ctx context.Context, name, leaderID s defer cancel() return a.queries.LeadershipResign(ctx, a.executor, dbsqlc.LeadershipResignParams{ - LeaderID: leaderID, - Name: name, + LeaderID: leaderID, + LeadershipTopic: string(notifier.NotificationTopicLeadership), + Name: name, }) } diff --git a/internal/dbsqlc/river_job.sql b/internal/dbsqlc/river_job.sql index 47f0cc6a..92f82fc7 100644 --- a/internal/dbsqlc/river_job.sql +++ b/internal/dbsqlc/river_job.sql @@ -225,7 +225,7 @@ river_job_scheduled AS ( ) SELECT count(*) FROM ( -SELECT pg_notify('river_insert', json_build_object('queue', queue)::text) +SELECT pg_notify(@insert_topic, json_build_object('queue', queue)::text) FROM river_job_scheduled) AS notifications_sent; -- name: JobSetState :one diff --git a/internal/dbsqlc/river_job.sql.go b/internal/dbsqlc/river_job.sql.go index 756551ba..9a8f12fd 100644 --- a/internal/dbsqlc/river_job.sql.go +++ b/internal/dbsqlc/river_job.sql.go @@ -573,12 +573,12 @@ WITH jobs_to_schedule AS ( state IN ('scheduled', 'retryable') AND queue IS NOT NULL AND priority >= 0 - AND scheduled_at <= $1::timestamptz + AND scheduled_at <= $2::timestamptz ORDER BY priority, scheduled_at, id - LIMIT $2::bigint + LIMIT $3::bigint FOR UPDATE ), river_job_scheduled AS ( @@ -590,17 +590,18 @@ river_job_scheduled AS ( ) SELECT count(*) FROM ( -SELECT pg_notify('river_insert', json_build_object('queue', queue)::text) +SELECT pg_notify($1, json_build_object('queue', queue)::text) FROM river_job_scheduled) AS notifications_sent ` type JobScheduleParams struct { - Now time.Time - Max int64 + InsertTopic string + Now time.Time + Max int64 } func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg JobScheduleParams) (int64, error) { - row := db.QueryRow(ctx, jobSchedule, arg.Now, arg.Max) + row := db.QueryRow(ctx, jobSchedule, arg.InsertTopic, arg.Now, arg.Max) var count int64 err := row.Scan(&count) return count, err diff --git a/internal/dbsqlc/river_leader.sql b/internal/dbsqlc/river_leader.sql index 25e0f0d0..251aab1e 100644 --- a/internal/dbsqlc/river_leader.sql +++ b/internal/dbsqlc/river_leader.sql @@ -40,7 +40,7 @@ WITH currently_held_leaders AS ( ), notified_resignations AS ( SELECT - pg_notify('river_leadership', json_build_object('name', name, 'leader_id', leader_id, 'action', 'resigned')::text), + pg_notify(@leadership_topic, json_build_object('name', name, 'leader_id', leader_id, 'action', 'resigned')::text), currently_held_leaders.name FROM currently_held_leaders) diff --git a/internal/dbsqlc/river_leader.sql.go b/internal/dbsqlc/river_leader.sql.go index fb82f0a7..ddbb9521 100644 --- a/internal/dbsqlc/river_leader.sql.go +++ b/internal/dbsqlc/river_leader.sql.go @@ -80,7 +80,7 @@ WITH currently_held_leaders AS ( ), notified_resignations AS ( SELECT - pg_notify('river_leadership', json_build_object('name', name, 'leader_id', leader_id, 'action', 'resigned')::text), + pg_notify($3, json_build_object('name', name, 'leader_id', leader_id, 'action', 'resigned')::text), currently_held_leaders.name FROM currently_held_leaders) @@ -89,11 +89,12 @@ WHERE river_leader.name = notified_resignations.name ` type LeadershipResignParams struct { - Name string - LeaderID string + Name string + LeaderID string + LeadershipTopic string } func (q *Queries) LeadershipResign(ctx context.Context, db DBTX, arg LeadershipResignParams) error { - _, err := db.Exec(ctx, leadershipResign, arg.Name, arg.LeaderID) + _, err := db.Exec(ctx, leadershipResign, arg.Name, arg.LeaderID, arg.LeadershipTopic) return err } diff --git a/internal/maintenance/scheduler.go b/internal/maintenance/scheduler.go index 7fc583d6..82e01892 100644 --- a/internal/maintenance/scheduler.go +++ b/internal/maintenance/scheduler.go @@ -10,6 +10,7 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/maintenance/startstop" + "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/util/timeutil" @@ -139,8 +140,9 @@ func (s *Scheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, error defer cancelFunc() numScheduled, err := s.queries.JobSchedule(ctx, s.dbExecutor, dbsqlc.JobScheduleParams{ - Max: int64(s.config.Limit), - Now: s.TimeNowUTC(), + InsertTopic: string(notifier.NotificationTopicInsert), + Max: int64(s.config.Limit), + Now: s.TimeNowUTC(), }) if err != nil { return 0, fmt.Errorf("error deleting completed jobs: %w", err)