diff --git a/CHANGELOG.md b/CHANGELOG.md index 69342d69..913ee9f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,11 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -⚠️ Version 0.19.0 has minor breaking changes for the `Worker.Middleware`, introduced fairly recently in 0.17.0. We tried not to make this change, but found the existing middleware interface insufficient to provide the necessary range of functionality we wanted, and this is a secondary middleware facility that won't be in use for many users, so it seemed worthwhile. +⚠️ Version 0.19.0 has minor breaking changes for the `Worker.Middleware`, introduced fairly recently in 0.17.0 that has a worker's `Middleware` function now taking a non-generic `JobRow` parameter instead of a generic `Job[T]`. We tried not to make this change, but found the existing middleware interface insufficient to provide the necessary range of functionality we wanted, and this is a secondary middleware facility that won't be in use for many users, so it seemed worthwhile. ### Added - Added a new "hooks" API for tying into River functionality at various points like job inserts or working. Differs from middleware in that it doesn't go on the stack and can't modify context, but in some cases is able to run at a more granular level (e.g. for each job insert rather than each _batch_ of inserts). [PR #789](https://github.com/riverqueue/river/pull/789). +- `river.Config` has a generic `Middleware` setting that can be used as a convenient way to configure middlewares that implement multiple middleware interfaces (e.g. `JobInsertMiddleware` _and_ `WorkerMiddleware`). Use of this setting is preferred over `Config.JobInsertMiddleware` and `Config.WorkerMiddleware`, which have been deprecated. [PR #804](https://github.com/riverqueue/river/pull/804). ### Changed diff --git a/client.go b/client.go index 2e669c95..3e2389a3 100644 --- a/client.go +++ b/client.go @@ -18,6 +18,7 @@ import ( "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/leadership" "github.com/riverqueue/river/internal/maintenance" + "github.com/riverqueue/river/internal/middlewarelookup" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/notifylimiter" "github.com/riverqueue/river/internal/rivercommon" @@ -157,6 +158,9 @@ type Config struct { // JobInsertMiddleware are optional functions that can be called around job // insertion. + // + // Deprecated: Prefer the use of Middleware instead (which may contain + // instances of rivertype.JobInsertMiddleware). JobInsertMiddleware []rivertype.JobInsertMiddleware // JobTimeout is the maximum amount of time a job is allowed to run before its @@ -168,8 +172,24 @@ type Config struct { JobTimeout time.Duration // Hooks are functions that may activate at certain points during a job's - // lifecycle (see rivertype.Hook), installed globally. Jobs may have their - // own specific hooks by implementing the JobArgsWithHooks interface. + // lifecycle (see rivertype.Hook), installed globally. + // + // The effect of hooks in this list will depend on the specific hook + // interfaces they implement, so for example implementing + // rivertype.HookInsertBegin will cause the hook to be invoked before a job + // is inserted, or implementing rivertype.HookWorkBegin will cause it to be + // invoked before a job is worked. Hook structs may implement multiple hook + // interfaces. + // + // Order in this list is significant. A hook that appears first will be + // entered before a hook that appears later. For any particular phase, order + // is relevant only for hooks that will run for that phase. For example, if + // two rivertype.HookInsertBegin are separated by a rivertype.HookWorkBegin, + // during job insertion those two outer hooks will run one after another, + // and the work hook between them will not run. When a job is worked, the + // work hook runs and the insertion hooks on either side of it are skipped. + // + // Jobs may have their own specific hooks by implementing JobArgsWithHooks. Hooks []rivertype.Hook // Logger is the structured logger to use for logging purposes. If none is @@ -185,6 +205,26 @@ type Config struct { // If not specified, defaults to 25 (MaxAttemptsDefault). MaxAttempts int + // Middleware contains middleware that may activate at certain points during + // a job's lifecycle (see rivertype.Middleware), installed globally. + // + // The effect of middleware in this list will depend on the specific + // middleware interfaces they implement, so for example implementing + // rivertype.JobInsertMiddleware will cause the middleware to be invoked + // when jobs are inserted, and implementing rivertype.WorkerMiddleware will + // cause it to be invoked when a job is worked. Middleware structs may + // implement multiple middleware interfaces. + // + // Order in this list is significant. Middleware that appears first will be + // entered before middleware that appears later. For any particular phase, + // order is relevant only for middlewares that will run for that phase. For + // example, if two rivertype.JobInsertMiddleware are separated by a + // rivertype.WorkerMiddleware, during job insertion those two outer + // middlewares will run one after another, and the work middleware between + // them will not run. When a job is worked, the work middleware runs and the + // insertion middlewares on either side of it are skipped. + Middleware []rivertype.Middleware + // PeriodicJobs are a set of periodic jobs to run at the specified intervals // in the client. PeriodicJobs []*PeriodicJob @@ -276,6 +316,9 @@ type Config struct { // WorkerMiddleware are optional functions that can be called around // all job executions. + // + // Deprecated: Prefer the use of Middleware instead (which may contain + // instances of rivertype.WorkerMiddleware). WorkerMiddleware []rivertype.WorkerMiddleware // Scheduler run interval. Shared between the scheduler and producer/job @@ -325,6 +368,7 @@ func (c *Config) WithDefaults() *Config { JobTimeout: valutil.ValOrDefault(c.JobTimeout, JobTimeoutDefault), Logger: logger, MaxAttempts: valutil.ValOrDefault(c.MaxAttempts, MaxAttemptsDefault), + Middleware: c.Middleware, PeriodicJobs: c.PeriodicJobs, PollOnly: c.PollOnly, Queues: c.Queues, @@ -334,8 +378,8 @@ func (c *Config) WithDefaults() *Config { SkipUnknownJobCheck: c.SkipUnknownJobCheck, Test: c.Test, TestOnly: c.TestOnly, - Workers: c.Workers, WorkerMiddleware: c.WorkerMiddleware, + Workers: c.Workers, schedulerInterval: valutil.ValOrDefault(c.schedulerInterval, maintenance.JobSchedulerIntervalDefault), } } @@ -368,6 +412,9 @@ func (c *Config) validate() error { if c.MaxAttempts < 0 { return errors.New("MaxAttempts cannot be less than zero") } + if len(c.Middleware) > 0 && (len(c.JobInsertMiddleware) > 0 || len(c.WorkerMiddleware) > 0) { + return errors.New("only one of the pair JobInsertMiddleware/WorkerMiddleware or Middleware may be provided (Middleware is recommended, and may contain both job insert and worker middleware)") + } if c.RescueStuckJobsAfter < 0 { return errors.New("RescueStuckJobsAfter cannot be less than zero") } @@ -430,23 +477,24 @@ type Client[TTx any] struct { baseService baseservice.BaseService baseStartStop startstop.BaseStartStop - completer jobcompleter.JobCompleter - config *Config - driver riverdriver.Driver[TTx] - elector *leadership.Elector - hookLookupByJob *hooklookup.JobHookLookup - hookLookupGlobal hooklookup.HookLookupInterface - insertNotifyLimiter *notifylimiter.Limiter - notifier *notifier.Notifier // may be nil in poll-only mode - periodicJobs *PeriodicJobBundle - pilot riverpilot.Pilot - producersByQueueName map[string]*producer - queueMaintainer *maintenance.QueueMaintainer - queues *QueueBundle - services []startstop.Service - stopped <-chan struct{} - subscriptionManager *subscriptionManager - testSignals clientTestSignals + completer jobcompleter.JobCompleter + config *Config + driver riverdriver.Driver[TTx] + elector *leadership.Elector + hookLookupByJob *hooklookup.JobHookLookup + hookLookupGlobal hooklookup.HookLookupInterface + insertNotifyLimiter *notifylimiter.Limiter + middlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface + notifier *notifier.Notifier // may be nil in poll-only mode + periodicJobs *PeriodicJobBundle + pilot riverpilot.Pilot + producersByQueueName map[string]*producer + queueMaintainer *maintenance.QueueMaintainer + queues *QueueBundle + services []startstop.Service + stopped <-chan struct{} + subscriptionManager *subscriptionManager + testSignals clientTestSignals // workCancel cancels the context used for all work goroutines. Normal Stop // does not cancel that context. @@ -564,6 +612,33 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is client.insertNotifyLimiter = notifylimiter.NewLimiter(archetype, config.FetchCooldown) + // Validation ensures that config.JobInsertMiddleware/WorkerMiddleware or + // the more abstract config.Middleware for middleware are set, but not both, + // so in practice we never append all three of these to each other. + { + middleware := config.Middleware + for _, jobInsertMiddleware := range config.JobInsertMiddleware { + middleware = append(middleware, jobInsertMiddleware) + } + outerLoop: + for _, workerMiddleware := range config.WorkerMiddleware { + // Don't add the middleware if it also implements JobInsertMiddleware + // and the instance has been added to config.JobInsertMiddleware. This + // is a hedge to make sure we don't accidentally double add middleware + // as we've converted over to the unified config.Middleware setting. + if workerMiddlewareAsJobInsertMiddleware, ok := workerMiddleware.(rivertype.JobInsertMiddleware); ok { + for _, jobInsertMiddleware := range config.JobInsertMiddleware { + if workerMiddlewareAsJobInsertMiddleware == jobInsertMiddleware { + continue outerLoop + } + } + } + + middleware = append(middleware, workerMiddleware) + } + client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middleware) + } + pluginDriver, _ := driver.(driverPlugin[TTx]) if pluginDriver != nil { pluginDriver.PluginInit(archetype) @@ -1559,12 +1634,13 @@ func (c *Client[TTx]) insertManyShared( return results, nil } - if len(c.config.JobInsertMiddleware) > 0 { + jobInsertMiddleware := c.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert) + if len(jobInsertMiddleware) > 0 { // Wrap middlewares in reverse order so the one defined first is wrapped // as the outermost function and is first to receive the operation. - for i := len(c.config.JobInsertMiddleware) - 1; i >= 0; i-- { - middlewareItem := c.config.JobInsertMiddleware[i] // capture the current middleware item - previousDoInner := doInner // Capture the current doInner function + for i := len(jobInsertMiddleware) - 1; i >= 0; i-- { + middlewareItem := jobInsertMiddleware[i].(rivertype.JobInsertMiddleware) //nolint:forcetypeassert // capture the current middleware item + previousDoInner := doInner // Capture the current doInner function doInner = func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { return middlewareItem.InsertMany(ctx, insertParams, previousDoInner) } @@ -1778,22 +1854,22 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error { func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *producer { producer := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{ - ClientID: c.config.ID, - Completer: c.completer, - ErrorHandler: c.config.ErrorHandler, - FetchCooldown: c.config.FetchCooldown, - FetchPollInterval: c.config.FetchPollInterval, - HookLookupByJob: c.hookLookupByJob, - HookLookupGlobal: c.hookLookupGlobal, - JobTimeout: c.config.JobTimeout, - MaxWorkers: queueConfig.MaxWorkers, - Notifier: c.notifier, - Queue: queueName, - QueueEventCallback: c.subscriptionManager.distributeQueueEvent, - RetryPolicy: c.config.RetryPolicy, - SchedulerInterval: c.config.schedulerInterval, - Workers: c.config.Workers, - WorkerMiddleware: c.config.WorkerMiddleware, + ClientID: c.config.ID, + Completer: c.completer, + ErrorHandler: c.config.ErrorHandler, + FetchCooldown: c.config.FetchCooldown, + FetchPollInterval: c.config.FetchPollInterval, + HookLookupByJob: c.hookLookupByJob, + HookLookupGlobal: c.hookLookupGlobal, + JobTimeout: c.config.JobTimeout, + MaxWorkers: queueConfig.MaxWorkers, + MiddlewareLookupGlobal: c.middlewareLookupGlobal, + Notifier: c.notifier, + Queue: queueName, + QueueEventCallback: c.subscriptionManager.distributeQueueEvent, + RetryPolicy: c.config.RetryPolicy, + SchedulerInterval: c.config.schedulerInterval, + Workers: c.config.Workers, }) c.producersByQueueName[queueName] = producer return producer diff --git a/client_test.go b/client_test.go index 256c0d12..c0f004e0 100644 --- a/client_test.go +++ b/client_test.go @@ -26,6 +26,7 @@ import ( "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/internal/maintenance" + "github.com/riverqueue/river/internal/middlewarelookup" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" @@ -736,7 +737,7 @@ func Test_Client(t *testing.T) { return doInner(ctx) }, } - bundle.config.WorkerMiddleware = []rivertype.WorkerMiddleware{middleware} + bundle.config.Middleware = []rivertype.Middleware{middleware} AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[callbackArgs]) error { require.Equal(t, "called", ctx.Value(privateKey("middleware"))) @@ -5464,6 +5465,55 @@ func Test_NewClient_Validations(t *testing.T) { require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts) }, }, + { + name: "Middleware can be configured independently", + configFunc: func(config *Config) { + config.Middleware = []rivertype.Middleware{&overridableJobMiddleware{}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1) + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) + }, + }, + { + name: "JobInsertMiddleware and WorkMiddleware may be configured together with separate middlewares", + configFunc: func(config *Config) { + config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{&overridableJobMiddleware{}} + config.WorkerMiddleware = []rivertype.WorkerMiddleware{&overridableJobMiddleware{}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 2) + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2) + }, + }, + { + name: "JobInsertMiddleware and WorkMiddleware may be configured together with same middleware", + configFunc: func(config *Config) { + middleware := &overridableJobMiddleware{} + config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{middleware} + config.WorkerMiddleware = []rivertype.WorkerMiddleware{middleware} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1) + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) + }, + }, + { + name: "Middleware not allowed with JobInsertMiddleware", + configFunc: func(config *Config) { + config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{&overridableJobMiddleware{}} + config.Middleware = []rivertype.Middleware{&overridableJobMiddleware{}} + }, + wantErr: errors.New("only one of the pair JobInsertMiddleware/WorkerMiddleware or Middleware may be provided (Middleware is recommended, and may contain both job insert and worker middleware)"), + }, + { + name: "Middleware not allowed with WorkerMiddleware", + configFunc: func(config *Config) { + config.Middleware = []rivertype.Middleware{&overridableJobMiddleware{}} + config.WorkerMiddleware = []rivertype.WorkerMiddleware{&overridableJobMiddleware{}} + }, + wantErr: errors.New("only one of the pair JobInsertMiddleware/WorkerMiddleware or Middleware may be provided (Middleware is recommended, and may contain both job insert and worker middleware)"), + }, { name: "RescueStuckJobsAfter may be overridden", configFunc: func(config *Config) { diff --git a/common_test.go b/common_test.go index 30b374f6..eeb7fbc8 100644 --- a/common_test.go +++ b/common_test.go @@ -1,6 +1,7 @@ package river_test import ( + "context" "fmt" "time" @@ -13,6 +14,19 @@ import ( // helpers aren't included in Godoc and keep each example more succinct. // +type NoOpArgs struct{} + +func (NoOpArgs) Kind() string { return "no_op" } + +type NoOpWorker struct { + river.WorkerDefaults[NoOpArgs] +} + +func (w *NoOpWorker) Work(ctx context.Context, job *river.Job[NoOpArgs]) error { + fmt.Printf("NoOpWorker.Work ran\n") + return nil +} + // Wait on the given subscription channel for numJobs. Times out with a panic if // jobs take too long to be received. func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) { diff --git a/example_global_hooks_test.go b/example_global_hooks_test.go new file mode 100644 index 00000000..980ba3f0 --- /dev/null +++ b/example_global_hooks_test.go @@ -0,0 +1,115 @@ +package river_test + +import ( + "context" + "fmt" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/util/slogutil" + "github.com/riverqueue/river/rivertype" +) + +type BothInsertAndWorkBeginHook struct{ river.HookDefaults } + +func (BothInsertAndWorkBeginHook) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error { + fmt.Printf("BothInsertAndWorkBeginHook.InsertBegin ran\n") + return nil +} + +func (BothInsertAndWorkBeginHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + fmt.Printf("BothInsertAndWorkBeginHook.WorkBegin ran\n") + return nil +} + +type InsertBeginHook struct{ river.HookDefaults } + +func (InsertBeginHook) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error { + fmt.Printf("InsertBeginHook.InsertBegin ran\n") + return nil +} + +type WorkBeginHook struct{ river.HookDefaults } + +func (WorkBeginHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + fmt.Printf("WorkBeginHook.WorkBegin ran\n") + return nil +} + +// Verify interface compliance. It's recommended that these are included in your +// test suite to make sure that your hooks are complying to the specific +// interface hooks that you expected them to be. +var ( + _ rivertype.HookInsertBegin = &BothInsertAndWorkBeginHook{} + _ rivertype.HookWorkBegin = &BothInsertAndWorkBeginHook{} + _ rivertype.HookInsertBegin = &InsertBeginHook{} + _ rivertype.HookWorkBegin = &WorkBeginHook{} +) + +// Example_globalHooks demonstrates the use of hooks to modify River behavior +// which are global to a River client. +func Example_globalHooks() { + ctx := context.Background() + + dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example")) + if err != nil { + panic(err) + } + defer dbPool.Close() + + // Required for the purpose of this test, but not necessary in real usage. + if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { + panic(err) + } + + workers := river.NewWorkers() + river.AddWorker(workers, &NoOpWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + // Order is significant. See output below. + Hooks: []rivertype.Hook{ + &BothInsertAndWorkBeginHook{}, + &InsertBeginHook{}, + &WorkBeginHook{}, + }, + Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Out of example scope, but used to wait until a job is worked. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) + defer subscribeCancel() + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + _, err = riverClient.Insert(ctx, NoOpArgs{}, nil) + if err != nil { + panic(err) + } + + waitForNJobs(subscribeChan, 1) + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + // Output: + // BothInsertAndWorkBeginHook.InsertBegin ran + // InsertBeginHook.InsertBegin ran + // BothInsertAndWorkBeginHook.WorkBegin ran + // WorkBeginHook.WorkBegin ran + // NoOpWorker.Work ran +} diff --git a/example_global_middleware_test.go b/example_global_middleware_test.go new file mode 100644 index 00000000..7b3d14fb --- /dev/null +++ b/example_global_middleware_test.go @@ -0,0 +1,115 @@ +package river_test + +import ( + "context" + "fmt" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/util/slogutil" + "github.com/riverqueue/river/rivertype" +) + +type JobBothInsertAndWorkMiddleware struct{ river.MiddlewareDefaults } + +func (JobBothInsertAndWorkMiddleware) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) { + fmt.Printf("JobBothInsertAndWorkMiddleware.InsertMany ran\n") + return doInner(ctx) +} + +func (JobBothInsertAndWorkMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + fmt.Printf("JobBothInsertAndWorkMiddleware.Work ran\n") + return doInner(ctx) +} + +type JobInsertMiddleware struct{ river.MiddlewareDefaults } + +func (JobInsertMiddleware) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) { + fmt.Printf("JobInsertMiddleware.InsertMany ran\n") + return doInner(ctx) +} + +type WorkerMiddleware struct{ river.MiddlewareDefaults } + +func (WorkerMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + fmt.Printf("WorkerMiddleware.Work ran\n") + return doInner(ctx) +} + +// Verify interface compliance. It's recommended that these are included in your +// test suite to make sure that your middlewares are complying to the specific +// interface middlewares that you expected them to be. +var ( + _ rivertype.JobInsertMiddleware = &JobBothInsertAndWorkMiddleware{} + _ rivertype.WorkerMiddleware = &JobBothInsertAndWorkMiddleware{} + _ rivertype.JobInsertMiddleware = &JobInsertMiddleware{} + _ rivertype.WorkerMiddleware = &WorkerMiddleware{} +) + +// Example_globalMiddleware demonstrates the use of middleware to modify River +// behavior which are global to a River client. +func Example_globalMiddleware() { + ctx := context.Background() + + dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example")) + if err != nil { + panic(err) + } + defer dbPool.Close() + + // Required for the purpose of this test, but not necessary in real usage. + if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { + panic(err) + } + + workers := river.NewWorkers() + river.AddWorker(workers, &NoOpWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + // Order is significant. See output below. + Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), + Middleware: []rivertype.Middleware{ + &JobBothInsertAndWorkMiddleware{}, + &JobInsertMiddleware{}, + &WorkerMiddleware{}, + }, + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Out of example scope, but used to wait until a job is worked. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) + defer subscribeCancel() + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + _, err = riverClient.Insert(ctx, NoOpArgs{}, nil) + if err != nil { + panic(err) + } + + waitForNJobs(subscribeChan, 1) + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + // Output: + // JobBothInsertAndWorkMiddleware.InsertMany ran + // JobInsertMiddleware.InsertMany ran + // JobBothInsertAndWorkMiddleware.Work ran + // WorkerMiddleware.Work ran + // NoOpWorker.Work ran +} diff --git a/example_job_args_hooks_test.go b/example_job_args_hooks_test.go new file mode 100644 index 00000000..8ff0277a --- /dev/null +++ b/example_job_args_hooks_test.go @@ -0,0 +1,133 @@ +package river_test + +import ( + "context" + "fmt" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/util/slogutil" + "github.com/riverqueue/river/rivertype" +) + +type JobWithHooksArgs struct{} + +func (JobWithHooksArgs) Kind() string { return "job_with_hooks" } + +// Warning: Hooks is only called once per job insert or work and its return +// value is memoized. It should not vary based on the contents of any particular +// args because changes will be ignored. +func (JobWithHooksArgs) Hooks() []rivertype.Hook { + // Order is significant. See output below. + return []rivertype.Hook{ + &JobWithHooksBothInsertAndWorkBeginHook{}, + &JobWithHooksInsertBeginHook{}, + &JobWithHooksWorkBeginHook{}, + } +} + +type JobWithHooksWorker struct { + river.WorkerDefaults[JobWithHooksArgs] +} + +func (w *JobWithHooksWorker) Work(ctx context.Context, job *river.Job[JobWithHooksArgs]) error { + fmt.Printf("JobWithHooksWorker.Work ran\n") + return nil +} + +type JobWithHooksBothInsertAndWorkBeginHook struct{ river.HookDefaults } + +func (JobWithHooksBothInsertAndWorkBeginHook) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error { + fmt.Printf("JobWithHooksInsertAndWorkBeginHook.InsertBegin ran\n") + return nil +} + +func (JobWithHooksBothInsertAndWorkBeginHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + fmt.Printf("JobWithHooksInsertAndWorkBeginHook.WorkBegin ran\n") + return nil +} + +type JobWithHooksInsertBeginHook struct{ river.HookDefaults } + +func (JobWithHooksInsertBeginHook) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error { + fmt.Printf("JobWithHooksInsertBeginHook.InsertBegin ran\n") + return nil +} + +type JobWithHooksWorkBeginHook struct{ river.HookDefaults } + +func (JobWithHooksWorkBeginHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + fmt.Printf("JobWithHooksWorkBeginHook.WorkBegin ran\n") + return nil +} + +// Verify interface compliance. It's recommended that these are included in your +// test suite to make sure that your hooks are complying to the specific +// interface hooks that you expected them to be. +var ( + _ rivertype.HookInsertBegin = &BothInsertAndWorkBeginHook{} + _ rivertype.HookWorkBegin = &BothInsertAndWorkBeginHook{} + _ rivertype.HookInsertBegin = &InsertBeginHook{} + _ rivertype.HookWorkBegin = &WorkBeginHook{} +) + +// Example_jobArgsHooks demonstrates the use of hooks to modify River behavior. +func Example_jobArgsHooks() { + ctx := context.Background() + + dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example")) + if err != nil { + panic(err) + } + defer dbPool.Close() + + // Required for the purpose of this test, but not necessary in real usage. + if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { + panic(err) + } + + workers := river.NewWorkers() + river.AddWorker(workers, &JobWithHooksWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Out of example scope, but used to wait until a job is worked. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) + defer subscribeCancel() + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + _, err = riverClient.Insert(ctx, JobWithHooksArgs{}, nil) + if err != nil { + panic(err) + } + + waitForNJobs(subscribeChan, 1) + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + // Output: + // JobWithHooksInsertAndWorkBeginHook.InsertBegin ran + // JobWithHooksInsertBeginHook.InsertBegin ran + // JobWithHooksInsertAndWorkBeginHook.WorkBegin ran + // JobWithHooksWorkBeginHook.WorkBegin ran + // JobWithHooksWorker.Work ran +} diff --git a/internal/execution/execution.go b/internal/execution/execution.go index 7c8e90d8..63608cb1 100644 --- a/internal/execution/execution.go +++ b/internal/execution/execution.go @@ -28,24 +28,25 @@ func MaybeApplyTimeout(ctx context.Context, timeout time.Duration) (context.Cont // MiddlewareChain chains together the given middleware functions, returning a // single function that applies them all in reverse order. -func MiddlewareChain(global, worker []rivertype.WorkerMiddleware, doInner Func, jobRow *rivertype.JobRow) Func { +func MiddlewareChain(globalMiddleware []rivertype.Middleware, workerMiddleware []rivertype.WorkerMiddleware, doInner Func, jobRow *rivertype.JobRow) Func { // Quick return for no middleware, which will often be the case. - if len(global) < 1 && len(worker) < 1 { + if len(globalMiddleware) < 1 && len(workerMiddleware) < 1 { return doInner } - // Write this so as to avoid a new slice allocation in cases where there is - // no worker specific middleware (which will be the common case). - allMiddleware := global - if len(worker) > 0 { - allMiddleware = append(allMiddleware, worker...) - } - // Wrap middlewares in reverse order so the one defined first is wrapped // as the outermost function and is first to receive the operation. - for i := len(allMiddleware) - 1; i >= 0; i-- { - middlewareItem := allMiddleware[i] // capture the current middleware item - previousDoInner := doInner // capture the current doInner function + for i := len(globalMiddleware) - 1; i >= 0; i-- { + middlewareItem := globalMiddleware[i].(rivertype.WorkerMiddleware) //nolint:forcetypeassert // capture the current middleware item + previousDoInner := doInner // capture the current doInner function + doInner = func(ctx context.Context) error { + return middlewareItem.Work(ctx, jobRow, previousDoInner) + } + } + + for i := len(workerMiddleware) - 1; i >= 0; i-- { + middlewareItem := workerMiddleware[i] // capture the current middleware item + previousDoInner := doInner // capture the current doInner function doInner = func(ctx context.Context) error { return middlewareItem.Work(ctx, jobRow, previousDoInner) } diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index a5c3b62c..290babbf 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -16,6 +16,7 @@ import ( "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/internal/middlewarelookup" "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" @@ -113,7 +114,7 @@ type JobExecutor struct { HookLookupGlobal hooklookup.HookLookupInterface InformProducerDoneFunc func(jobRow *rivertype.JobRow) JobRow *rivertype.JobRow - WorkerMiddleware []rivertype.WorkerMiddleware + MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface SchedulerInterval time.Duration WorkUnit workunit.WorkUnit @@ -216,7 +217,12 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { return e.WorkUnit.Work(ctx) }) - executeFunc := execution.MiddlewareChain(e.WorkerMiddleware, e.WorkUnit.Middleware(), doInner, e.JobRow) + executeFunc := execution.MiddlewareChain( + e.MiddlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), + e.WorkUnit.Middleware(), + doInner, + e.JobRow, + ) return &jobExecutorResult{Err: executeFunc(ctx), MetadataUpdates: metadataUpdates} } diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 788e5364..694986d5 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/jobcompleter" + "github.com/riverqueue/river/internal/middlewarelookup" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/riverinternaltest/retrypolicytest" @@ -185,6 +186,7 @@ func TestJobExecutor_Execute(t *testing.T) { HookLookupGlobal: hooklookup.NewHookLookup(nil), InformProducerDoneFunc: func(job *rivertype.JobRow) {}, JobRow: bundle.jobRow, + MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil), SchedulerInterval: riverinternaltest.SchedulerShortInterval, WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow), }) @@ -582,13 +584,13 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) // Add a middleware so we can verify it's in the trace too: - executor.WorkerMiddleware = []rivertype.WorkerMiddleware{ + executor.MiddlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup([]rivertype.Middleware{ &testMiddleware{ work: func(ctx context.Context, job *rivertype.JobRow, next func(context.Context) error) error { return next(ctx) }, }, - } + }) executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") @@ -737,6 +739,8 @@ type testMiddleware struct { work func(ctx context.Context, job *rivertype.JobRow, next func(context.Context) error) error } +func (m *testMiddleware) IsMiddleware() bool { return true } + func (m *testMiddleware) Work(ctx context.Context, job *rivertype.JobRow, next func(context.Context) error) error { return m.work(ctx, job, next) } diff --git a/internal/middlewarelookup/middleware_lookup.go b/internal/middlewarelookup/middleware_lookup.go new file mode 100644 index 00000000..4b45265f --- /dev/null +++ b/internal/middlewarelookup/middleware_lookup.go @@ -0,0 +1,108 @@ +package middlewarelookup + +import ( + "sync" + + "github.com/riverqueue/river/rivertype" +) + +// +// MiddlewareKind +// + +type MiddlewareKind string + +const ( + MiddlewareKindJobInsert MiddlewareKind = "job_insert" + MiddlewareKindWorker MiddlewareKind = "worker" +) + +// +// MiddlewareLookupInterface +// + +// MiddlewareLookupInterface is an interface to look up middlewares by +// middleware kind. It's commonly implemented by MiddlewareLookup, but may also +// be EmptyMiddlewareLookup as a memory allocation optimization for bundles +// where no middlewares are present. +type MiddlewareLookupInterface interface { + ByMiddlewareKind(kind MiddlewareKind) []rivertype.Middleware +} + +// NewMiddlewareLookup returns a new middleware lookup interface based on the given middlewares +// that satisfies MiddlewareLookupInterface. This is often middlewareLookup, but may be +// emptyMiddlewareLookup as an optimization for the common case of an empty middleware +// bundle. +func NewMiddlewareLookup(middlewares []rivertype.Middleware) MiddlewareLookupInterface { + if len(middlewares) < 1 { + return &emptyMiddlewareLookup{} + } + + return &middlewareLookup{ + middlewares: middlewares, + middlewaresByKind: make(map[MiddlewareKind][]rivertype.Middleware), + mu: &sync.RWMutex{}, + } +} + +// +// middlewareLookup +// + +// middlewareLookup looks up and caches middlewares based on a MiddlewareKind, saving work when +// looking up middlewares for specific operations, a common operation that gets +// repeated over and over again. This struct may be used as a lookup for +// globally installed middlewares or middlewares for specific job kinds through the use of +// JobMiddlewareLookup. +type middlewareLookup struct { + middlewares []rivertype.Middleware + middlewaresByKind map[MiddlewareKind][]rivertype.Middleware + mu *sync.RWMutex +} + +func (c *middlewareLookup) ByMiddlewareKind(kind MiddlewareKind) []rivertype.Middleware { + c.mu.RLock() + cache, ok := c.middlewaresByKind[kind] + c.mu.RUnlock() + if ok { + return cache + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Even if this ends up being empty, make sure there's an entry for the next + // time the cache gets invoked for this kind. + c.middlewaresByKind[kind] = nil + + // Rely on exhaustlint to find any missing middleware kinds here. + switch kind { + case MiddlewareKindJobInsert: + for _, middleware := range c.middlewares { + if typedMiddleware, ok := middleware.(rivertype.JobInsertMiddleware); ok { + c.middlewaresByKind[kind] = append(c.middlewaresByKind[kind], typedMiddleware) + } + } + case MiddlewareKindWorker: + for _, middleware := range c.middlewares { + if typedMiddleware, ok := middleware.(rivertype.WorkerMiddleware); ok { + c.middlewaresByKind[kind] = append(c.middlewaresByKind[kind], typedMiddleware) + } + } + } + + return c.middlewaresByKind[kind] +} + +// +// emptyMiddlewareLookup +// + +// emptyMiddlewareLookup is an empty version of MiddlewareLookup that's zero allocation. For +// most applications, most job args won't have middlewares, so this prevents us from +// allocating dozens/hundreds of small MiddlewareLookup objects that go unused. +type emptyMiddlewareLookup struct{} + +func (c *emptyMiddlewareLookup) ByMiddlewareKind(kind MiddlewareKind) []rivertype.Middleware { + return nil +} diff --git a/internal/middlewarelookup/middleware_lookup_test.go b/internal/middlewarelookup/middleware_lookup_test.go new file mode 100644 index 00000000..806f28c8 --- /dev/null +++ b/internal/middlewarelookup/middleware_lookup_test.go @@ -0,0 +1,144 @@ +package middlewarelookup + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivertype" +) + +func TestMiddlewareLookup(t *testing.T) { + t.Parallel() + + type testBundle struct{} + + setup := func(t *testing.T) (*middlewareLookup, *testBundle) { //nolint:unparam + t.Helper() + + return NewMiddlewareLookup([]rivertype.Middleware{ //nolint:forcetypeassert + &testMiddlewareJobInsertAndWorker{}, + &testMiddlewareJobInsert{}, + &testMiddlewareWorker{}, + }).(*middlewareLookup), &testBundle{} + } + + t.Run("LooksUpMiddleware", func(t *testing.T) { + t.Parallel() + + middlewareLookup, _ := setup(t) + + require.Equal(t, []rivertype.Middleware{ + &testMiddlewareJobInsertAndWorker{}, + &testMiddlewareJobInsert{}, + }, middlewareLookup.ByMiddlewareKind(MiddlewareKindJobInsert)) + require.Equal(t, []rivertype.Middleware{ + &testMiddlewareJobInsertAndWorker{}, + &testMiddlewareWorker{}, + }, middlewareLookup.ByMiddlewareKind(MiddlewareKindWorker)) + + require.Len(t, middlewareLookup.middlewaresByKind, 2) + + // Repeat lookups to make sure we get the same result. + require.Equal(t, []rivertype.Middleware{ + &testMiddlewareJobInsertAndWorker{}, + &testMiddlewareJobInsert{}, + }, middlewareLookup.ByMiddlewareKind(MiddlewareKindJobInsert)) + require.Equal(t, []rivertype.Middleware{ + &testMiddlewareJobInsertAndWorker{}, + &testMiddlewareWorker{}, + }, middlewareLookup.ByMiddlewareKind(MiddlewareKindWorker)) + }) + + t.Run("Stress", func(t *testing.T) { + t.Parallel() + + middlewareLookup, _ := setup(t) + + var wg sync.WaitGroup + + parallelLookupLoop := func(kind MiddlewareKind) { + wg.Add(1) + go func() { + defer wg.Done() + + for range 50 { + middlewareLookup.ByMiddlewareKind(kind) + } + }() + } + + parallelLookupLoop(MiddlewareKindJobInsert) + parallelLookupLoop(MiddlewareKindWorker) + parallelLookupLoop(MiddlewareKindJobInsert) + parallelLookupLoop(MiddlewareKindWorker) + + wg.Wait() + }) +} + +func TestEmptyMiddlewareLookup(t *testing.T) { + t.Parallel() + + type testBundle struct{} + + setup := func(t *testing.T) (*emptyMiddlewareLookup, *testBundle) { + t.Helper() + + return NewMiddlewareLookup(nil).(*emptyMiddlewareLookup), &testBundle{} //nolint:forcetypeassert + } + + t.Run("AlwaysReturnsNil", func(t *testing.T) { + t.Parallel() + + middlewareLookup, _ := setup(t) + + require.Nil(t, middlewareLookup.ByMiddlewareKind(MiddlewareKindJobInsert)) + require.Nil(t, middlewareLookup.ByMiddlewareKind(MiddlewareKindWorker)) + }) +} + +// +// testMiddlewareInsertAndWorkBegin +// + +var ( + _ rivertype.JobInsertMiddleware = &testMiddlewareJobInsertAndWorker{} + _ rivertype.WorkerMiddleware = &testMiddlewareJobInsertAndWorker{} +) + +type testMiddlewareJobInsertAndWorker struct{ rivertype.Middleware } + +func (t *testMiddlewareJobInsertAndWorker) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) { + return doInner(ctx) +} + +func (t *testMiddlewareJobInsertAndWorker) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { + return doInner(ctx) +} + +// +// testMiddlewareJobInsert +// + +var _ rivertype.JobInsertMiddleware = &testMiddlewareJobInsert{} + +type testMiddlewareJobInsert struct{ rivertype.Middleware } + +func (t *testMiddlewareJobInsert) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) { + return doInner(ctx) +} + +// +// testMiddlewareWorker +// + +var _ rivertype.WorkerMiddleware = &testMiddlewareWorker{} + +type testMiddlewareWorker struct{ rivertype.Middleware } + +func (t *testMiddlewareWorker) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { + return doInner(ctx) +} diff --git a/job.go b/job.go index 0868abef..fb2aa2ef 100644 --- a/job.go +++ b/job.go @@ -24,6 +24,9 @@ type JobArgs interface { Kind() string } +// JobArgsWithHooks is an interface that job args can implement to attach +// specific hooks (i.e. other than those globally installed to a client) to +// certain kinds of jobs. type JobArgsWithHooks interface { // Hooks returns specific hooks to run for this job type. These will run // after the global hooks configured on the client. diff --git a/middleware_defaults.go b/middleware_defaults.go index e2d39276..acdc0d76 100644 --- a/middleware_defaults.go +++ b/middleware_defaults.go @@ -6,11 +6,15 @@ import ( "github.com/riverqueue/river/rivertype" ) +type MiddlewareDefaults struct{} + +func (d *MiddlewareDefaults) IsMiddleware() bool { return true } + // JobInsertMiddlewareDefaults is an embeddable struct that provides default // implementations for the rivertype.JobInsertMiddleware. Use of this struct is // recommended in case rivertype.JobInsertMiddleware is expanded in the future // so that existing code isn't unexpectedly broken during an upgrade. -type JobInsertMiddlewareDefaults struct{} +type JobInsertMiddlewareDefaults struct{ MiddlewareDefaults } func (d *JobInsertMiddlewareDefaults) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) { return doInner(ctx) @@ -20,7 +24,7 @@ func (d *JobInsertMiddlewareDefaults) InsertMany(ctx context.Context, manyParams // implementations for the rivertype.WorkerMiddleware. Use of this struct is // recommended in case rivertype.WorkerMiddleware is expanded in the future so // that existing code isn't unexpectedly broken during an upgrade. -type WorkerMiddlewareDefaults struct{} +type WorkerMiddlewareDefaults struct{ MiddlewareDefaults } func (d *WorkerMiddlewareDefaults) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { return doInner(ctx) diff --git a/middleware_test.go b/middleware_test.go index 4e9b17dc..a54bdde3 100644 --- a/middleware_test.go +++ b/middleware_test.go @@ -7,8 +7,7 @@ import ( ) type overridableJobMiddleware struct { - JobInsertMiddlewareDefaults - WorkerMiddlewareDefaults + MiddlewareDefaults insertManyFunc func(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) workFunc func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error @@ -18,12 +17,12 @@ func (m *overridableJobMiddleware) InsertMany(ctx context.Context, manyParams [] if m.insertManyFunc != nil { return m.insertManyFunc(ctx, manyParams, doInner) } - return m.JobInsertMiddlewareDefaults.InsertMany(ctx, manyParams, doInner) + return doInner(ctx) } func (m *overridableJobMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { if m.workFunc != nil { return m.workFunc(ctx, job, doInner) } - return m.WorkerMiddlewareDefaults.Work(ctx, job, doInner) + return doInner(ctx) } diff --git a/producer.go b/producer.go index 8700fb24..bec1657f 100644 --- a/producer.go +++ b/producer.go @@ -12,6 +12,7 @@ import ( "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/jobexecutor" + "github.com/riverqueue/river/internal/middlewarelookup" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/util/chanutil" @@ -65,10 +66,11 @@ type producerConfig struct { // LISTEN/NOTIFY, but this provides a fallback. FetchPollInterval time.Duration - HookLookupByJob *hooklookup.JobHookLookup - HookLookupGlobal hooklookup.HookLookupInterface - JobTimeout time.Duration - MaxWorkers int + HookLookupByJob *hooklookup.JobHookLookup + HookLookupGlobal hooklookup.HookLookupInterface + JobTimeout time.Duration + MaxWorkers int + MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface // Notifier is a notifier for subscribing to new job inserts and job // control. If nil, the producer will operate in poll-only mode. @@ -89,7 +91,6 @@ type producerConfig struct { RetryPolicy ClientRetryPolicy SchedulerInterval time.Duration Workers *Workers - WorkerMiddleware []rivertype.WorkerMiddleware } func (c *producerConfig) mustValidate() *producerConfig { @@ -621,10 +622,10 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. ErrorHandler: p.errorHandler, HookLookupByJob: p.config.HookLookupByJob, HookLookupGlobal: p.config.HookLookupGlobal, + MiddlewareLookupGlobal: p.config.MiddlewareLookupGlobal, InformProducerDoneFunc: p.handleWorkerDone, JobRow: job, SchedulerInterval: p.config.SchedulerInterval, - WorkerMiddleware: p.config.WorkerMiddleware, WorkUnit: workUnit, }) p.addActiveJob(job.ID, executor) diff --git a/producer_test.go b/producer_test.go index bd7df759..059e0478 100644 --- a/producer_test.go +++ b/producer_test.go @@ -13,6 +13,7 @@ import ( "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/maintenance" + "github.com/riverqueue/river/internal/middlewarelookup" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" @@ -89,19 +90,20 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { Completer: completer, ErrorHandler: newTestErrorHandler(), // Fetch constantly to more aggressively trigger the potential data race: - FetchCooldown: time.Millisecond, - FetchPollInterval: time.Millisecond, - HookLookupByJob: hooklookup.NewJobHookLookup(), - HookLookupGlobal: hooklookup.NewHookLookup(nil), - JobTimeout: JobTimeoutDefault, - MaxWorkers: 1000, - Notifier: notifier, - Queue: rivercommon.QueueDefault, - QueuePollInterval: queuePollIntervalDefault, - QueueReportInterval: queueReportIntervalDefault, - RetryPolicy: &DefaultClientRetryPolicy{}, - SchedulerInterval: maintenance.JobSchedulerIntervalDefault, - Workers: workers, + FetchCooldown: time.Millisecond, + FetchPollInterval: time.Millisecond, + HookLookupByJob: hooklookup.NewJobHookLookup(), + HookLookupGlobal: hooklookup.NewHookLookup(nil), + JobTimeout: JobTimeoutDefault, + MaxWorkers: 1000, + MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil), + Notifier: notifier, + Queue: rivercommon.QueueDefault, + QueuePollInterval: queuePollIntervalDefault, + QueueReportInterval: queueReportIntervalDefault, + RetryPolicy: &DefaultClientRetryPolicy{}, + SchedulerInterval: maintenance.JobSchedulerIntervalDefault, + Workers: workers, }) params := make([]*riverdriver.JobInsertFastParams, maxJobCount) @@ -174,22 +176,23 @@ func TestProducer_PollOnly(t *testing.T) { } return newProducer(archetype, exec, &producerConfig{ - ClientID: testClientID, - Completer: completer, - ErrorHandler: newTestErrorHandler(), - FetchCooldown: FetchCooldownDefault, - FetchPollInterval: 50 * time.Millisecond, // more aggressive than normal because we have no notifier - HookLookupByJob: hooklookup.NewJobHookLookup(), - HookLookupGlobal: hooklookup.NewHookLookup(nil), - JobTimeout: JobTimeoutDefault, - MaxWorkers: 1_000, - Notifier: nil, // no notifier - Queue: rivercommon.QueueDefault, - QueuePollInterval: queuePollIntervalDefault, - QueueReportInterval: queueReportIntervalDefault, - RetryPolicy: &DefaultClientRetryPolicy{}, - SchedulerInterval: riverinternaltest.SchedulerShortInterval, - Workers: NewWorkers(), + ClientID: testClientID, + Completer: completer, + ErrorHandler: newTestErrorHandler(), + FetchCooldown: FetchCooldownDefault, + FetchPollInterval: 50 * time.Millisecond, // more aggressive than normal because we have no notifier + HookLookupByJob: hooklookup.NewJobHookLookup(), + HookLookupGlobal: hooklookup.NewHookLookup(nil), + JobTimeout: JobTimeoutDefault, + MaxWorkers: 1_000, + MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil), + Notifier: nil, // no notifier + Queue: rivercommon.QueueDefault, + QueuePollInterval: queuePollIntervalDefault, + QueueReportInterval: queueReportIntervalDefault, + RetryPolicy: &DefaultClientRetryPolicy{}, + SchedulerInterval: riverinternaltest.SchedulerShortInterval, + Workers: NewWorkers(), }), jobUpdates }) } @@ -222,22 +225,23 @@ func TestProducer_WithNotifier(t *testing.T) { } return newProducer(archetype, exec, &producerConfig{ - ClientID: testClientID, - Completer: completer, - ErrorHandler: newTestErrorHandler(), - FetchCooldown: FetchCooldownDefault, - FetchPollInterval: 50 * time.Millisecond, // more aggressive than normal so in case we miss the event, tests still pass quickly - HookLookupByJob: hooklookup.NewJobHookLookup(), - HookLookupGlobal: hooklookup.NewHookLookup(nil), - JobTimeout: JobTimeoutDefault, - MaxWorkers: 1_000, - Notifier: notifier, - Queue: rivercommon.QueueDefault, - QueuePollInterval: queuePollIntervalDefault, - QueueReportInterval: queueReportIntervalDefault, - RetryPolicy: &DefaultClientRetryPolicy{}, - SchedulerInterval: riverinternaltest.SchedulerShortInterval, - Workers: NewWorkers(), + ClientID: testClientID, + Completer: completer, + ErrorHandler: newTestErrorHandler(), + FetchCooldown: FetchCooldownDefault, + FetchPollInterval: 50 * time.Millisecond, // more aggressive than normal so in case we miss the event, tests still pass quickly + HookLookupByJob: hooklookup.NewJobHookLookup(), + HookLookupGlobal: hooklookup.NewHookLookup(nil), + JobTimeout: JobTimeoutDefault, + MaxWorkers: 1_000, + MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil), + Notifier: notifier, + Queue: rivercommon.QueueDefault, + QueuePollInterval: queuePollIntervalDefault, + QueueReportInterval: queueReportIntervalDefault, + RetryPolicy: &DefaultClientRetryPolicy{}, + SchedulerInterval: riverinternaltest.SchedulerShortInterval, + Workers: NewWorkers(), }), jobUpdates }) } diff --git a/rivertest/worker.go b/rivertest/worker.go index d1254fe1..d8f877c2 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -14,6 +14,7 @@ import ( "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/internal/maintenance" + "github.com/riverqueue/river/internal/middlewarelookup" "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" @@ -194,8 +195,8 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job InformProducerDoneFunc: func(job *rivertype.JobRow) { close(executionDone) }, HookLookupGlobal: hooklookup.NewHookLookup(w.config.Hooks), HookLookupByJob: hooklookup.NewJobHookLookup(), - WorkerMiddleware: w.config.WorkerMiddleware, JobRow: job, + MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(w.config.Middleware), SchedulerInterval: maintenance.JobSchedulerIntervalDefault, WorkUnit: workUnit, }) diff --git a/rivertype/river_type.go b/rivertype/river_type.go index a5863ce6..ee416c04 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -290,12 +290,16 @@ type JobInsertParams struct { // they don't add anything to the call stack. Call stacks that get overly deep // can become a bit of an operational nightmare because they get hard to read. // -// In a language with more specific type capabilities, this should be a union -// type. In Go we implement it somewhat awkwardly so that we can get future -// extensibility, but also some typing guarantees to prevent misuse (i.e. if -// Hook was an empty interface, then any object could be passed as a hook, but -// having a single function to implement forces the caller to make some token -// motions in the direction of implementing hooks). +// In a language with more specific type capabilities, this interface would be a +// union type. In Go we implement it somewhat awkwardly so that we can get +// future extensibility, but also some typing guarantees to prevent misuse (i.e. +// if Hook was an empty interface, then any object could be passed as a hook, +// but having a single function to implement forces the caller to make some +// token motions in the direction of implementing hooks). +// +// List of hook interfaces that may be implemented: +// - HookInsertBegin +// - HookWorkBegin type Hook interface { // IsHook is a sentinel function to check that a type is implementing Hook // on purpose and not by accident (Hook would otherwise be an empty @@ -319,13 +323,60 @@ type HookWorkBegin interface { WorkBegin(ctx context.Context, job *JobRow) error } -// JobInsertMiddleware provides an interface for middleware that integrations can -// use to encapsulate common logic around job insertion. +// Middleware is an arbitrary interface for a struct which will execute some +// arbitrary code at a predefined step in the job lifecycle. +// +// This interface is left purposely non-specific. Middleware structs should +// embed river.MiddlewareDefaults to inherit an IsMiddleware implementation, +// then implement a more specific hook interface like JobInsertMiddleware or +// WorkerMiddleware. A middleware struct may also implement multiple specific +// hook interfaces which are logically related and benefit from being grouped +// together. +// +// Hooks differ from middleware in that they're invoked at a specific lifecycle +// phase, but finish immediately instead of wrapping an inner call like a +// middleware does. One of the main ramifications of this different is that a +// hook cannot modify context in any useful way to pass down into the stack. +// Like a normal function, any changes it makes to its context are discarded on +// return. +// +// Middleware differs from hooks in that they wrap a specific lifecycle phase, +// staying on the callstack for the duration of the step while they call into a +// doInner function that executes the step and the rest of the middleware stack. +// The main ramification of this difference is that middleware can modify +// context for the step and any other middleware inner relative to it. +// +// All else equal, hooks should generally be preferred over middleware because +// they don't add anything to the call stack. Call stacks that get overly deep +// can become a bit of an operational nightmare because they get hard to read. +// +// In a language with more specific type capabilities, this interface would be a +// union type. In Go we implement it somewhat awkwardly so that we can get +// future extensibility, but also some typing guarantees to prevent misuse (i.e. +// if Hook was an empty interface, then any object could be passed as a hook, +// but having a single function to implement forces the caller to make some +// token motions in the direction of implementing hooks). +// +// List of middleware interfaces that may be implemented: +// - JobInsertMiddleware +// - WorkerMiddleware +type Middleware interface { + // IsMiddleware is a sentinel function to check that a type is implementing + // Middleware on purpose and not by accident (Middleware would otherwise be + // an empty interface). Middleware should embed river.MiddlewareDefaults to + // pick up an implementation for this function automatically. + IsMiddleware() bool +} + +// JobInsertMiddleware provides an interface for middleware that integrations +// can use to encapsulate common logic around job insertion. // // Implementations should embed river.JobMiddlewareDefaults to inherit default // implementations for phases where no custom code is needed, and for forward // compatibility in case new functions are added to this interface. type JobInsertMiddleware interface { + Middleware + // InsertMany is invoked around a batch insert operation. Implementations // must always include a call to doInner to call down the middleware stack // and perform the batch insertion, and may run custom code before and after. @@ -335,7 +386,11 @@ type JobInsertMiddleware interface { InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) } +// WorkerMiddleware provides an interface for middleware that integrations can +// use to encapsulate common logic when a job is worked. type WorkerMiddleware interface { + Middleware + // Work is invoked after a job's JSON args being unmarshaled and before the // job is worked. Implementations must always include a call to doInner to // call down the middleware stack and perform the batch insertion, and may run