From 90b81af02e9f97331c747ccc0ef3a3d06ad3b436 Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 14 Nov 2025 00:09:59 -0800 Subject: [PATCH] Add hook for when the periodic job enqueuer starts up This one's an alternative (and hopefully a better one) to #1078. #1078 works for removing a periodic job from a leader, but doesn't cover the case where a new periodic job might need to be added dynamically from any possible client. Here, we add the new hook `HookPeriodicJobsStart` which runs when the periodic job enqueuer starts up on a newly elected leader. This gives the hook quite a bit of power to dynamically adjust all currently configured periodic jobs on the enqueuer. For example, it may clear all of them and then add them back from scratch using a canonical set of periodic job records from elsewhere which it knows should exist. This will need to be paired with another feature that would allow users to force a leader reelection (which I haven't written yet). So when they determine that a periodic job overhaul is necessary, they'll knock the current leader offline and then use `HookPeriodicJobsStart` to configure the right set of periodic jobs. A modification we make here is to make it possible to get a client from context in the hook similar to how it's possible in a worker right now. This is because the hook implementation will often want access to a client instance, but it's very awkward getting access to a client otherwise because you define your hooks before a client is available, thereby creating a chicken and egg problem. --- CHANGELOG.md | 8 +- client.go | 7 +- client_test.go | 48 +++++ hook_defaults_funcs.go | 10 ++ hook_defaults_funcs_test.go | 3 + internal/hooklookup/hook_lookup.go | 13 +- internal/maintenance/periodic_job_enqueuer.go | 40 +++-- .../maintenance/periodic_job_enqueuer_test.go | 170 ++++++++++++++++++ periodic_job.go | 22 ++- periodic_job_test.go | 4 +- rivershared/riverpilot/pilot.go | 2 + rivertype/river_type.go | 43 +++++ 12 files changed, 337 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f54c782..a070d14f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Added `HookPeriodicJobsStart` that can be used to run custom logic when a periodic job enqueuer starts up on a new leader. [PR #1084](https://github.com/riverqueue/river/pull/1084). - Added `Client.Notify().RequestResign` and `Client.Notify().RequestResignTx` functions allowing any client to request that the current leader resign. [PR #1085](https://github.com/riverqueue/river/pull/1085). ## [0.28.0] - 2025-11-23 @@ -15,13 +16,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `riverlog.LoggerSafely` which provides a non-panic variant of `riverlog.Logger` for use when code may or may not have a context logger available. [PR #1093](https://github.com/riverqueue/river/pull/1093). -## [0.27.1] - 2025-11-21 - -### Fixed - -- Unique args: Handle embedded fields that are not structs. [PR #1088](https://github.com/riverqueue/river/pull/1088). -- Fix stack overflow when handling `river:"unique"` annotations on recursive types. [PR #1090](https://github.com/riverqueue/river/pull/1090). - ## [0.27.0] - 2025-11-14 ### Added diff --git a/client.go b/client.go index 7113b6e3..35cbe0c2 100644 --- a/client.go +++ b/client.go @@ -907,6 +907,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { periodicJobEnqueuer, err := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{ AdvisoryLockPrefix: config.AdvisoryLockPrefix, + HookLookupGlobal: client.hookLookupGlobal, Insert: client.insertMany, Pilot: client.pilot, Schema: config.Schema, @@ -1042,7 +1043,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // We use separate contexts for fetching and working to allow for a graceful // stop. Both inherit from the provided context, so if it's cancelled, a // more aggressive stop will be initiated. - workCtx, workCancel := context.WithCancelCause(withClient(ctx, c)) + workCtx, workCancel := context.WithCancelCause(ctx) + + // Client available to executors and to various service hooks. + fetchCtx := withClient(fetchCtx, c) + workCtx = withClient(workCtx, c) if err := startstop.StartAll(fetchCtx, c.services...); err != nil { workCancel(err) diff --git a/client_test.go b/client_test.go index 7cb13266..eebbf160 100644 --- a/client_test.go +++ b/client_test.go @@ -4884,6 +4884,54 @@ func Test_Client_Maintenance(t *testing.T) { requireJobHasState(jobInFuture3.ID, jobInFuture3.State) }) + t.Run("PeriodicJobEnqueuerStartHook", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + var periodicJobsStartHookCalled bool + config.Hooks = []rivertype.Hook{ + HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + periodicJobsStartHookCalled = true + + client := ClientFromContext[pgx.Tx](ctx) + + client.PeriodicJobs().Clear() + + client.PeriodicJobs().Add( + NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) { + return periodicJobArgs{}, nil + }, &PeriodicJobOpts{ID: "new_periodic_job", RunOnStart: true}), + ) + + return nil + }), + } + + worker := &periodicJobWorker{} + AddWorker(config.Workers, worker) + config.PeriodicJobs = []*PeriodicJob{ + NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) { + return periodicJobArgs{}, nil + }, &PeriodicJobOpts{ID: "old_periodic_job", RunOnStart: true}), + } + + client, _ := setup(t, config) + + startAndWaitForQueueMaintainer(ctx, t, client) + + svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer) + svc.TestSignals.InsertedJobs.WaitOrTimeout() + + require.True(t, periodicJobsStartHookCalled) + + // Use the return value of these to determine that the OnStart callback + // went through successfully. (True if the job was removed and false + // otherwise.) + require.False(t, svc.RemoveByID("old_periodic_job")) + require.True(t, svc.RemoveByID("new_periodic_job")) + }) + t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) { t.Parallel() diff --git a/hook_defaults_funcs.go b/hook_defaults_funcs.go index 8f06fc43..93a7f4fe 100644 --- a/hook_defaults_funcs.go +++ b/hook_defaults_funcs.go @@ -23,6 +23,16 @@ func (f HookInsertBeginFunc) InsertBegin(ctx context.Context, params *rivertype. func (f HookInsertBeginFunc) IsHook() bool { return true } +// HookPeriodicJobsStartFunc is a convenience helper for implementing +// rivertype.HookPeriodicJobsStart using a simple function instead of a struct. +type HookPeriodicJobsStartFunc func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error + +func (f HookPeriodicJobsStartFunc) IsHook() bool { return true } + +func (f HookPeriodicJobsStartFunc) Start(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + return f(ctx, params) +} + // HookWorkBeginFunc is a convenience helper for implementing // rivertype.HookWorkBegin using a simple function instead of a struct. type HookWorkBeginFunc func(ctx context.Context, job *rivertype.JobRow) error diff --git a/hook_defaults_funcs_test.go b/hook_defaults_funcs_test.go index 2c64bde2..43a20297 100644 --- a/hook_defaults_funcs_test.go +++ b/hook_defaults_funcs_test.go @@ -11,6 +11,9 @@ var ( _ rivertype.Hook = HookInsertBeginFunc(func(ctx context.Context, params *rivertype.JobInsertParams) error { return nil }) _ rivertype.HookInsertBegin = HookInsertBeginFunc(func(ctx context.Context, params *rivertype.JobInsertParams) error { return nil }) + _ rivertype.Hook = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil }) + _ rivertype.HookPeriodicJobsStart = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil }) + _ rivertype.Hook = HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { return nil }) _ rivertype.HookWorkBegin = HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { return nil }) ) diff --git a/internal/hooklookup/hook_lookup.go b/internal/hooklookup/hook_lookup.go index 44fe6647..420debb2 100644 --- a/internal/hooklookup/hook_lookup.go +++ b/internal/hooklookup/hook_lookup.go @@ -13,9 +13,10 @@ import ( type HookKind string const ( - HookKindInsertBegin HookKind = "insert_begin" - HookKindWorkBegin HookKind = "work_begin" - HookKindWorkEnd HookKind = "work_end" + HookKindInsertBegin HookKind = "insert_begin" + HookKindPeriodicJobsStart HookKind = "periodic_job_start" + HookKindWorkBegin HookKind = "work_begin" + HookKindWorkEnd HookKind = "work_end" ) // @@ -83,6 +84,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook { c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) } } + case HookKindPeriodicJobsStart: + for _, hook := range c.hooks { + if typedHook, ok := hook.(rivertype.HookPeriodicJobsStart); ok { + c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) + } + } case HookKindWorkBegin: for _, hook := range c.hooks { if typedHook, ok := hook.(rivertype.HookWorkBegin); ok { diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 3be394d3..5c56b5d7 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -10,6 +10,7 @@ import ( "github.com/tidwall/sjson" + "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" @@ -88,6 +89,8 @@ type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParam type PeriodicJobEnqueuerConfig struct { AdvisoryLockPrefix int32 + HookLookupGlobal hooklookup.HookLookupInterface + // Insert is the function to call to insert jobs into the database. Insert InsertFunc @@ -147,6 +150,11 @@ func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJo nextHandle++ } + hookLookupGlobal := config.HookLookupGlobal + if hookLookupGlobal == nil { + hookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{}) + } + pilot := config.Pilot if pilot == nil { pilot = &riverpilot.StandardPilot{} @@ -155,6 +163,7 @@ func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJo svc := baseservice.Init(archetype, &PeriodicJobEnqueuer{ Config: (&PeriodicJobEnqueuerConfig{ AdvisoryLockPrefix: config.AdvisoryLockPrefix, + HookLookupGlobal: hookLookupGlobal, Insert: config.Insert, PeriodicJobs: config.PeriodicJobs, Pilot: pilot, @@ -309,6 +318,23 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { s.StaggerStart(ctx) + initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ + Schema: s.Config.Schema, + }) + if err != nil { + return err + } + + for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) { + if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert + DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob { + return (*rivertype.DurablePeriodicJob)(job) + }), + }); err != nil { + return err + } + } + subServices := []startstop.Service{ startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically), } @@ -340,18 +366,8 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { // Initial set of periodic job IDs mapped to next run at times fetched // from a configured pilot. Not used in most cases. - initialPeriodicJobsMap := func() map[string]time.Time { - initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ - Schema: s.Config.Schema, - }) - if err != nil { - s.Logger.ErrorContext(ctx, s.Name+": Error fetching initial periodic jobs", "error", err) - return make(map[string]time.Time) - } - - return sliceutil.KeyBy(initialPeriodicJobs, - func(j *riverpilot.PeriodicJob) (string, time.Time) { return j.ID, j.NextRunAt }) - }() + initialPeriodicJobsMap := sliceutil.KeyBy(initialPeriodicJobs, + func(j *riverpilot.PeriodicJob) (string, time.Time) { return j.ID, j.NextRunAt }) var lastHandleSeen rivertype.PeriodicJobHandle = -1 // so handle 0 is considered diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index 76996616..6f76c995 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -2,7 +2,10 @@ package maintenance import ( "context" + "errors" "fmt" + "regexp" + "strconv" "strings" "sync" "testing" @@ -12,6 +15,7 @@ import ( "github.com/tidwall/gjson" "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" @@ -985,6 +989,154 @@ func TestPeriodicJobEnqueuer(t *testing.T) { _, err = svc.AddManySafely(periodicJobs) require.EqualError(t, err, "periodic job with ID already registered: periodic_job_100ms") }) + + t.Run("PeriodicJobsStartHookNoDurableJobs", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t) + + _, err := svc.AddManySafely([]*PeriodicJob{ + {ID: "periodic_job_10s", ScheduleFunc: periodicIntervalSchedule(10 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_10s", false)}, + {ID: "periodic_job_50s", ScheduleFunc: periodicIntervalSchedule(50 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_50s", false)}, + }) + require.NoError(t, err) + + var periodicJobsStartHookCalled bool + svc.Config.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{ + HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + periodicJobsStartHookCalled = true + + require.Empty(t, params.DurableJobs) + + require.True(t, svc.RemoveByID("periodic_job_10s")) + require.True(t, svc.RemoveByID("periodic_job_50s")) + + _, err := svc.AddManySafely([]*PeriodicJob{ + { + ID: "periodic_job_10s", + ConstructorFunc: jobConstructorFunc("periodic_job_10s", false), + RunOnStart: true, + ScheduleFunc: periodicIntervalSchedule(10 * time.Second), + }, + { + ID: "periodic_job_50s", + ConstructorFunc: jobConstructorFunc("periodic_job_50s", false), + ScheduleFunc: periodicIntervalSchedule(50 * time.Second), + }, + }) + require.NoError(t, err) + + return nil + }), + }) + + startService(t, svc) + + svc.TestSignals.InsertedJobs.WaitOrTimeout() // this works because `periodic_job_10s` is RunOnStart + + require.True(t, periodicJobsStartHookCalled) + + require.Contains(t, svc.periodicJobIDs, "periodic_job_10s") + require.Contains(t, svc.periodicJobIDs, "periodic_job_50s") + + require.Len(t, svc.periodicJobIDs, 2) + }) + + t.Run("PeriodicJobsStartHookWithDurableJobs", func(t *testing.T) { + t.Parallel() + + svc, bundle := setup(t) + + _, err := svc.AddManySafely([]*PeriodicJob{ + {ID: "periodic_job_10s", ScheduleFunc: periodicIntervalSchedule(100 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_10s", false)}, + {ID: "periodic_job_50s", ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_50s", false)}, + }) + require.NoError(t, err) + + now := time.Now() + + bundle.pilotMock.PeriodicJobGetAllMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) { + require.Equal(t, bundle.schema, params.Schema) + return []*riverpilot.PeriodicJob{ + {ID: "periodic_job_10s", NextRunAt: now.Add(-1 * time.Second)}, // will run immediately so we can wait on a test signal + {ID: "periodic_job_50s", NextRunAt: now.Add(5 * time.Hour)}, + }, nil + } + + var periodicJobsStartHookCalled bool + svc.Config.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{ + HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + periodicJobsStartHookCalled = true + + svc.Clear() + + durationRE := regexp.MustCompile(`([0-9]+)s\z`) + + for _, durableJob := range params.DurableJobs { + // Match should look like: [[10s 100]] + matches := durationRE.FindAllStringSubmatch(durableJob.ID, 1) + require.Len(t, matches, 1, "Couldn't extract a second duration from: %s", durableJob.ID) + require.Len(t, matches[0], 2) + + msStr := matches[0][1] + + msInt, err := strconv.Atoi(msStr) + require.NoError(t, err) + + _, err = svc.AddSafely(&PeriodicJob{ + ID: durableJob.ID, + ConstructorFunc: jobConstructorFunc(durableJob.ID, false), + ScheduleFunc: periodicIntervalSchedule(time.Duration(msInt) * time.Second), + }) + require.NoError(t, err) + } + + return nil + }), + }) + + startService(t, svc) + + svc.TestSignals.InsertedJobs.WaitOrTimeout() + + require.True(t, periodicJobsStartHookCalled) + + require.Contains(t, svc.periodicJobIDs, "periodic_job_10s") + require.Contains(t, svc.periodicJobIDs, "periodic_job_50s") + + require.Len(t, svc.periodicJobIDs, 2) + + { + periodicJob := svc.periodicJobs[svc.periodicJobIDs["periodic_job_10s"]] + // This durable job's next one was one second in the past so it'll + // be scheduled immediately, despite no RunOnStart attribute. Don't + // have to use WithinDuration because the previous `next_run_at` is + // used to calculate the next one, meaning we know exactly what its + // value will be by comparing to `now`. + require.Equal(t, now.Add(-1*time.Second).Add(10*time.Second), periodicJob.nextRunAt) + } + + { + periodicJob := svc.periodicJobs[svc.periodicJobIDs["periodic_job_50s"]] + // This should be *exactly* the same as the durable job aove because + // its `next_run_at` was in the future so it wasn't rescheduled. + require.Equal(t, now.Add(5*time.Hour), periodicJob.nextRunAt) + } + }) + + t.Run("PeriodicJobsStartHookCancelsStart", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t) + + svc.Config.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{ + HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + return errors.New("hook start error") + }), + }) + + require.EqualError(t, svc.Start(ctx), "hook start error") + }) } type PilotPeriodicJobMock struct { @@ -1018,3 +1170,21 @@ func (p *PilotPeriodicJobMock) PeriodicJobKeepAliveAndReap(ctx context.Context, func (p *PilotPeriodicJobMock) PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { return p.PeriodicJobUpsertManyMock(ctx, exec, params) } + +// +// *Func types are copied from the top level River package because they can't be +// accessed from here. +// + +type HookPeriodicJobsStartFunc func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error + +func (f HookPeriodicJobsStartFunc) IsHook() bool { return true } + +func (f HookPeriodicJobsStartFunc) Start(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + return f(ctx, params) +} + +var ( + _ rivertype.Hook = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil }) + _ rivertype.HookPeriodicJobsStart = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil }) +) diff --git a/periodic_job.go b/periodic_job.go index c14f1349..a6fd8795 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -4,6 +4,7 @@ import ( "time" "github.com/riverqueue/river/internal/maintenance" + "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" ) @@ -108,13 +109,13 @@ func (s *periodicIntervalSchedule) Next(t time.Time) time.Time { // made accessible through Client, where new periodic jobs can be configured, // and old ones removed. type PeriodicJobBundle struct { - clientConfig *Config + mapper *periodicJobInternalMapper periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer } func newPeriodicJobBundle(config *Config, periodicJobEnqueuer *maintenance.PeriodicJobEnqueuer) *PeriodicJobBundle { return &PeriodicJobBundle{ - clientConfig: config, + mapper: &periodicJobInternalMapper{archetype: &periodicJobEnqueuer.Archetype, config: config}, periodicJobEnqueuer: periodicJobEnqueuer, } } @@ -130,7 +131,7 @@ func newPeriodicJobBundle(config *Config, periodicJobEnqueuer *maintenance.Perio // new periodic job is fully enabled or disabled, it should be added or removed // from _every_ active River client across all processes. func (b *PeriodicJobBundle) Add(periodicJob *PeriodicJob) rivertype.PeriodicJobHandle { - handle, err := b.periodicJobEnqueuer.AddSafely(b.toInternal(periodicJob)) + handle, err := b.periodicJobEnqueuer.AddSafely(b.mapper.toInternal(periodicJob)) if err != nil { panic(err) } @@ -140,7 +141,7 @@ func (b *PeriodicJobBundle) Add(periodicJob *PeriodicJob) rivertype.PeriodicJobH // AddSafely is the same as Add, but it returns an error in the case of a // validation problem or duplicate ID instead of panicking. func (b *PeriodicJobBundle) AddSafely(periodicJob *PeriodicJob) (rivertype.PeriodicJobHandle, error) { - return b.periodicJobEnqueuer.AddSafely(b.toInternal(periodicJob)) + return b.periodicJobEnqueuer.AddSafely(b.mapper.toInternal(periodicJob)) } // AddMany adds many new periodic jobs to the client. The jobs are queued @@ -154,7 +155,7 @@ func (b *PeriodicJobBundle) AddSafely(periodicJob *PeriodicJob) (rivertype.Perio // new periodic job is fully enabled or disabled, it should be added or removed // from _every_ active River client across all processes. func (b *PeriodicJobBundle) AddMany(periodicJobs []*PeriodicJob) []rivertype.PeriodicJobHandle { - handles, err := b.periodicJobEnqueuer.AddManySafely(sliceutil.Map(periodicJobs, b.toInternal)) + handles, err := b.periodicJobEnqueuer.AddManySafely(sliceutil.Map(periodicJobs, b.mapper.toInternal)) if err != nil { panic(err) } @@ -164,7 +165,7 @@ func (b *PeriodicJobBundle) AddMany(periodicJobs []*PeriodicJob) []rivertype.Per // AddManySafely is the same as AddMany, but it returns an error in the case of // a validation problem or duplicate ID instead of panicking. func (b *PeriodicJobBundle) AddManySafely(periodicJobs []*PeriodicJob) ([]rivertype.PeriodicJobHandle, error) { - return b.periodicJobEnqueuer.AddManySafely(sliceutil.Map(periodicJobs, b.toInternal)) + return b.periodicJobEnqueuer.AddManySafely(sliceutil.Map(periodicJobs, b.mapper.toInternal)) } // Clear clears all periodic jobs, cancelling all scheduled runs. @@ -234,11 +235,16 @@ func (b *PeriodicJobBundle) RemoveManyByID(ids []string) { // An empty set of periodic job opts used as a default when none are specified. var periodicJobEmptyOpts PeriodicJobOpts //nolint:gochecknoglobals +type periodicJobInternalMapper struct { + archetype *baseservice.Archetype + config *Config +} + // There are two separate periodic job structs so that the top-level River // package can expose one while still containing most periodic job logic in a // subpackage. This function converts a top-level periodic job struct (used for // configuration) to an internal one. -func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.PeriodicJob { +func (m *periodicJobInternalMapper) toInternal(periodicJob *PeriodicJob) *maintenance.PeriodicJob { opts := &periodicJobEmptyOpts if periodicJob.opts != nil { opts = periodicJob.opts @@ -250,7 +256,7 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe if args == nil { return nil, maintenance.ErrNoJobToInsert } - return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options) + return insertParamsFromConfigArgsAndOptions(m.archetype, m.config, args, options) }, RunOnStart: opts.RunOnStart, ScheduleFunc: periodicJob.scheduleFunc.Next, diff --git a/periodic_job_test.go b/periodic_job_test.go index ffe88743..c5902d58 100644 --- a/periodic_job_test.go +++ b/periodic_job_test.go @@ -69,7 +69,7 @@ func TestPeriodicJobBundle(t *testing.T) { nil, ) - internalPeriodicJob := periodicJobBundle.toInternal(periodicJob) + internalPeriodicJob := periodicJobBundle.mapper.toInternal(periodicJob) insertParams1, err := internalPeriodicJob.ConstructorFunc() require.NoError(t, err) @@ -94,7 +94,7 @@ func TestPeriodicJobBundle(t *testing.T) { nil, ) - internalPeriodicJob := periodicJobBundle.toInternal(periodicJob) + internalPeriodicJob := periodicJobBundle.mapper.toInternal(periodicJob) _, err := internalPeriodicJob.ConstructorFunc() require.ErrorIs(t, err, maintenance.ErrNoJobToInsert) diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index ea0aff6b..ee298e5c 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -112,6 +112,8 @@ type PilotPeriodicJob interface { PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobUpsertManyParams) ([]*PeriodicJob, error) } +// TODO: Get rid of this in favor of rivertype.PeriodicJob the next time we're +// making River <-> River Pro API contract changes. type PeriodicJob struct { ID string CreatedAt time.Time diff --git a/rivertype/river_type.go b/rivertype/river_type.go index bc2b3b76..13d07f63 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -323,6 +323,34 @@ type HookInsertBegin interface { InsertBegin(ctx context.Context, params *JobInsertParams) error } +// HookPeriodicJobsStart is an interface to a hook that runs when the periodic +// job enqueuer starts on a newly elected leader. +type HookPeriodicJobsStart interface { + Hook + + // Start is invoked when the periodic job enqueuer starts on a newly elected + // leader. + // + // Returning an error will cancel the periodic job enqueuer's start up + // routine. Be very careful with this because if the error is chronic, it + // will prevent any client from successfully starting as leader, thereby + // effectively disabling all maintenance services. + Start(ctx context.Context, params *HookPeriodicJobsStartParams) error +} + +// HookPeriodicJobsStartParams are parameters for HookPeriodicJobsStart. +type HookPeriodicJobsStartParams struct { + // DurableJobs contains a list of durable periodic job records that + // were found in the database. This includes durable jobs that have been + // recently active in an elected periodic job enqueuer, but may also contain + // jobs that've been previously removed, but for which their database record + // has not yet been reaped. + // + // This property will be empty unless durable jobs (a pro feature) are + // enabled. + DurableJobs []*DurablePeriodicJob +} + // HookWorkBegin is an interface to a hook that runs after a job has been locked // for work and before it's worked. type HookWorkBegin interface { @@ -452,6 +480,21 @@ type WorkerMiddleware interface { Work(ctx context.Context, job *JobRow, doInner func(context.Context) error) error } +// DurablePeriodicJob represents a durable periodic job. +type DurablePeriodicJob struct { + // ID is a unique identifier for the durable periodic job. + ID string + + // CreatedAt is when the database record was created. + CreatedAt time.Time + + // NextRunAt is when the periodic job is next scheduled to run. + NextRunAt time.Time + + // UpdatedAt is when the database record was last updated. + UpdatedAt time.Time +} + // PeriodicJobHandle is a reference to a dynamically added periodic job // (returned by the use of `Client.PeriodicJobs().Add()`) which can be used to // subsequently remove the periodic job with `Remove()`.