From bc8d166b2d6e7152a51b9fde0602429b2622ea9f Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 26 Apr 2025 12:51:01 -0700 Subject: [PATCH] Add new `HookWorkEnd` interface that runs after workers finish Here, add a new complimentary pair for `HookWorkBegin`: `HookWorkEnd`, which runs after workers finish, taking in an error result. `HookWorkEnd` hooks may or may not modify the error result, choosing to suppress an error on pass it along the stack unchanged. This is driven by trying to add a new `nilerror` contrib package [1] that helps detect nil error-compliant structs that return non-nil error interfaces, which is a common footgun in Go [2]. [1] https://github.com/riverqueue/rivercontrib/pull/25 [2] https://go.dev/doc/faq#nil_error --- CHANGELOG.md | 1 + client_test.go | 143 ++++++++++++++++++---- hook_defaults_funcs.go | 10 ++ internal/hooklookup/hook_lookup.go | 7 ++ internal/hooklookup/hook_lookup_test.go | 30 ++++- internal/jobexecutor/job_executor.go | 13 +- internal/jobexecutor/job_executor_test.go | 119 ++++++++++++++++++ rivertype/river_type.go | 27 ++++ 8 files changed, 326 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2478a398..55ba7626 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `river/riverlog` containing middleware that injects a context logger to workers that collates log output and persists it with job metadata. This is paired with a River UI enhancement that shows logs in the UI. [PR #844](https://github.com/riverqueue/river/pull/844). - Added `JobInsertMiddlewareFunc` and `WorkerMiddlewareFunc` to easily implement middleware with a function instead of a struct. [PR #844](https://github.com/riverqueue/river/pull/844). - Added `Config.Schema` which lets a non-default schema be injected explicitly into a River client that'll be used for all database operations. This may be particularly useful for proxies like PgBouncer that may not respect a schema configured in `search_path`. [PR #848](https://github.com/riverqueue/river/pull/848). +- Added `rivertype.HookWorkEnd` hook interface that runs after a job has been worked. [PR #863](https://github.com/riverqueue/river/pull/863). ### Changed diff --git a/client_test.go b/client_test.go index 6cc6a838..bdf9a0b8 100644 --- a/client_test.go +++ b/client_test.go @@ -741,33 +741,110 @@ func Test_Client(t *testing.T) { require.True(t, workBeginHookCalled) }) + t.Run("WithGlobalWorkEndHook", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + workEndHookCalled := false + + bundle.config.Hooks = []rivertype.Hook{ + HookWorkEndFunc(func(ctx context.Context, err error) error { + workEndHookCalled = true + return err + }), + } + + AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[callbackArgs]) error { + return nil + })) + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, callbackArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + require.True(t, workEndHookCalled) + }) + t.Run("WithInsertBeginHookOnJobArgs", func(t *testing.T) { t.Parallel() _, bundle := setup(t) - AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[jobArgsWithCustomHook]) error { + type JobArgs struct { + JobArgsReflectKind[JobArgs] + hookEmbed[metadataHookInsertBegin] + } + + AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { return nil })) client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) require.NoError(t, err) - insertRes, err := client.Insert(ctx, jobArgsWithCustomHook{}, nil) + insertRes, err := client.Insert(ctx, JobArgs{}, nil) require.NoError(t, err) var metadataMap map[string]any err = json.Unmarshal(insertRes.Job.Metadata, &metadataMap) require.NoError(t, err) - require.Equal(t, "called", metadataMap["insert_begin_hook"]) + require.Equal(t, metadataHookCalled, metadataMap[metadataHookInsertBeginKey]) + }) + + t.Run("WithWorkBeginHookOnJobArgs", func(t *testing.T) { //nolint:dupl + t.Parallel() + + _, bundle := setup(t) + + type JobArgs struct { + JobArgsReflectKind[JobArgs] + hookEmbed[metadataHookWorkBegin] + } + + AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config) + require.NoError(t, err) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + + var metadataMap map[string]any + err = json.Unmarshal(event.Job.Metadata, &metadataMap) + require.NoError(t, err) + require.Equal(t, metadataHookCalled, metadataMap[metadataHookWorkBeginKey]) }) - t.Run("WithWorkBeginHookOnJobArgs", func(t *testing.T) { + t.Run("WithWorkEndHookOnJobArgs", func(t *testing.T) { //nolint:dupl t.Parallel() _, bundle := setup(t) - AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[jobArgsWithCustomHook]) error { + type JobArgs struct { + JobArgsReflectKind[JobArgs] + hookEmbed[metadataHookWorkEnd] + } + + AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { return nil })) @@ -777,7 +854,7 @@ func Test_Client(t *testing.T) { subscribeChan := subscribe(t, client) startClient(ctx, t, client) - insertRes, err := client.Insert(ctx, jobArgsWithCustomHook{}, nil) + insertRes, err := client.Insert(ctx, JobArgs{}, nil) require.NoError(t, err) event := riversharedtest.WaitOrTimeout(t, subscribeChan) @@ -787,7 +864,7 @@ func Test_Client(t *testing.T) { var metadataMap map[string]any err = json.Unmarshal(event.Job.Metadata, &metadataMap) require.NoError(t, err) - require.Equal(t, "called", metadataMap["work_begin_hook"]) + require.Equal(t, metadataHookCalled, metadataMap[metadataHookWorkEndKey]) }) t.Run("WithGlobalWorkerMiddleware", func(t *testing.T) { @@ -1167,30 +1244,31 @@ func Test_Client(t *testing.T) { }) } -type jobArgsWithCustomHook struct{} - -func (jobArgsWithCustomHook) Kind() string { return "with_custom_hook" } +// hookEmbed can be embedded on a JobArgs to add a hook to it in such a way that +// it can be encapsulated within a test case. +type hookEmbed[T rivertype.Hook] struct{} -func (jobArgsWithCustomHook) Hooks() []rivertype.Hook { - return []rivertype.Hook{ - &testHookInsertAndWorkBegin{}, - } +func (f hookEmbed[T]) Hooks() []rivertype.Hook { + var hook T + return []rivertype.Hook{hook} } -var ( - _ rivertype.HookInsertBegin = &testHookInsertAndWorkBegin{} - _ rivertype.HookWorkBegin = &testHookInsertAndWorkBegin{} +const ( + metadataHookCalled = "called" + metadataHookInsertBeginKey = "insert_begin" + metadataHookWorkBeginKey = "work_begin" + metadataHookWorkEndKey = "work_end" ) -type testHookInsertAndWorkBegin struct{ HookDefaults } +type metadataHookInsertBegin struct{ rivertype.Hook } -func (t *testHookInsertAndWorkBegin) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error { +func (metadataHookInsertBegin) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error { var metadataMap map[string]any if err := json.Unmarshal(params.Metadata, &metadataMap); err != nil { return err } - metadataMap["insert_begin_hook"] = "called" + metadataMap[metadataHookInsertBeginKey] = metadataHookCalled var err error params.Metadata, err = json.Marshal(metadataMap) @@ -1201,17 +1279,38 @@ func (t *testHookInsertAndWorkBegin) InsertBegin(ctx context.Context, params *ri return nil } -func (t *testHookInsertAndWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { +type metadataHookWorkBegin struct{ rivertype.Hook } + +func (metadataHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) if !hasMetadataUpdates { panic("expected to be called from within job executor") } - metadataUpdates["work_begin_hook"] = "called" + metadataUpdates[metadataHookWorkBeginKey] = metadataHookCalled return nil } +type metadataHookWorkEnd struct{ rivertype.Hook } + +func (metadataHookWorkEnd) WorkEnd(ctx context.Context, err error) error { + metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) + if !hasMetadataUpdates { + panic("expected to be called from within job executor") + } + + metadataUpdates[metadataHookWorkEndKey] = metadataHookCalled + + return err +} + +var ( + _ rivertype.HookInsertBegin = metadataHookInsertBegin{} + _ rivertype.HookWorkBegin = metadataHookWorkBegin{} + _ rivertype.HookWorkEnd = metadataHookWorkEnd{} +) + type workerWithMiddleware[T JobArgs] struct { WorkerDefaults[T] workFunc func(context.Context, *Job[T]) error diff --git a/hook_defaults_funcs.go b/hook_defaults_funcs.go index 47946da5..1b183ac4 100644 --- a/hook_defaults_funcs.go +++ b/hook_defaults_funcs.go @@ -32,3 +32,13 @@ func (f HookWorkBeginFunc) WorkBegin(ctx context.Context, job *rivertype.JobRow) } func (f HookWorkBeginFunc) IsHook() bool { return true } + +// HookWorkEndFunc is a convenience helper for implementing +// rivertype.HookworkEnd using a simple function instead of a struct. +type HookWorkEndFunc func(ctx context.Context, err error) error + +func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error { + return f(ctx, err) +} + +func (f HookWorkEndFunc) IsHook() bool { return true } diff --git a/internal/hooklookup/hook_lookup.go b/internal/hooklookup/hook_lookup.go index 007ef497..44fe6647 100644 --- a/internal/hooklookup/hook_lookup.go +++ b/internal/hooklookup/hook_lookup.go @@ -15,6 +15,7 @@ type HookKind string const ( HookKindInsertBegin HookKind = "insert_begin" HookKindWorkBegin HookKind = "work_begin" + HookKindWorkEnd HookKind = "work_end" ) // @@ -88,6 +89,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook { c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) } } + case HookKindWorkEnd: + for _, hook := range c.hooks { + if typedHook, ok := hook.(rivertype.HookWorkEnd); ok { + c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) + } + } } return c.hooksByKind[kind] diff --git a/internal/hooklookup/hook_lookup_test.go b/internal/hooklookup/hook_lookup_test.go index 90da7e35..71f4622a 100644 --- a/internal/hooklookup/hook_lookup_test.go +++ b/internal/hooklookup/hook_lookup_test.go @@ -22,6 +22,7 @@ func TestHookLookup(t *testing.T) { &testHookInsertAndWorkBegin{}, &testHookInsertBegin{}, &testHookWorkBegin{}, + &testHookWorkEnd{}, }).(*hookLookup), &testBundle{} } @@ -38,8 +39,11 @@ func TestHookLookup(t *testing.T) { &testHookInsertAndWorkBegin{}, &testHookWorkBegin{}, }, hookLookup.ByHookKind(HookKindWorkBegin)) + require.Equal(t, []rivertype.Hook{ + &testHookWorkEnd{}, + }, hookLookup.ByHookKind(HookKindWorkEnd)) - require.Len(t, hookLookup.hooksByKind, 2) + require.Len(t, hookLookup.hooksByKind, 3) // Repeat lookups to make sure we get the same result. require.Equal(t, []rivertype.Hook{ @@ -50,6 +54,9 @@ func TestHookLookup(t *testing.T) { &testHookInsertAndWorkBegin{}, &testHookWorkBegin{}, }, hookLookup.ByHookKind(HookKindWorkBegin)) + require.Equal(t, []rivertype.Hook{ + &testHookWorkEnd{}, + }, hookLookup.ByHookKind(HookKindWorkEnd)) }) t.Run("Stress", func(t *testing.T) { @@ -118,6 +125,7 @@ func TestJobHookLookup(t *testing.T) { require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindInsertBegin)) require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkBegin)) + require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkEnd)) require.Equal(t, []rivertype.Hook{ &testHookInsertAndWorkBegin{}, &testHookInsertBegin{}, @@ -126,12 +134,16 @@ func TestJobHookLookup(t *testing.T) { &testHookInsertAndWorkBegin{}, &testHookWorkBegin{}, }, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkBegin)) + require.Equal(t, []rivertype.Hook{ + &testHookWorkEnd{}, + }, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkEnd)) require.Len(t, jobHookLookup.hookLookupByKind, 2) // Repeat lookups to make sure we get the same result. require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindInsertBegin)) require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkBegin)) + require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkEnd)) require.Equal(t, []rivertype.Hook{ &testHookInsertAndWorkBegin{}, &testHookInsertBegin{}, @@ -140,6 +152,9 @@ func TestJobHookLookup(t *testing.T) { &testHookInsertAndWorkBegin{}, &testHookWorkBegin{}, }, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkBegin)) + require.Equal(t, []rivertype.Hook{ + &testHookWorkEnd{}, + }, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkEnd)) }) t.Run("Stress", func(t *testing.T) { @@ -195,6 +210,7 @@ func (jobArgsWithCustomHooks) Hooks() []rivertype.Hook { &testHookInsertAndWorkBegin{}, &testHookInsertBegin{}, &testHookWorkBegin{}, + &testHookWorkEnd{}, } } @@ -242,3 +258,15 @@ type testHookWorkBegin struct{ rivertype.Hook } func (t *testHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { return nil } + +// +// testHookWorkEnd +// + +var _ rivertype.HookWorkEnd = &testHookWorkEnd{} + +type testHookWorkEnd struct{ rivertype.Hook } + +func (t *testHookWorkEnd) WorkEnd(ctx context.Context, err error) error { + return nil +} diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index 84e8ca50..640d6ed7 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -208,7 +208,18 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout) defer cancel() - return e.WorkUnit.Work(ctx) + err := e.WorkUnit.Work(ctx) + + { + for _, hook := range append( + e.HookLookupGlobal.ByHookKind(hooklookup.HookKindWorkEnd), + e.WorkUnit.HookLookup(e.HookLookupByJob).ByHookKind(hooklookup.HookKindWorkEnd)..., + ) { + err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err) //nolint:forcetypeassert + } + } + + return err }) executeFunc := execution.MiddlewareChain( diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 7049e3b8..281618e5 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -804,8 +804,127 @@ func TestJobExecutor_Execute(t *testing.T) { require.Equal(t, rivertype.JobStateCompleted, job.State) require.Empty(t, job.Errors) }) + + t.Run("WorkHooks", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + var ( + workBeginCalled bool + workEndCalled bool + ) + executor.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{ + HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { + workBeginCalled = true + return nil + }), + HookWorkEndFunc(func(ctx context.Context, err error) error { + workEndCalled = true + return err + }), + }) + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return nil }, nil).MakeUnit(bundle.jobRow) + + executor.Execute(ctx) + jobsUpdated := riversharedtest.WaitOrTimeout(t, bundle.updateCh) + require.Len(t, jobsUpdated, 1) + require.Empty(t, jobsUpdated[0].Job.Errors) + + require.True(t, workBeginCalled) + require.True(t, workEndCalled) + }) + + t.Run("WorkEndErrorPassThrough", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + var ( + workEnd1Called bool + workEnd2Called bool + ) + executor.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{ + HookWorkEndFunc(func(ctx context.Context, err error) error { + workEnd1Called = true + require.EqualError(t, err, "job error") + return err + }), + HookWorkEndFunc(func(ctx context.Context, err error) error { + workEnd2Called = true + require.EqualError(t, err, "job error") + return err + }), + }) + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { + return errors.New("job error") + }, nil).MakeUnit(bundle.jobRow) + + executor.Execute(ctx) + jobsUpdated := riversharedtest.WaitOrTimeout(t, bundle.updateCh) + require.Len(t, jobsUpdated, 1) + require.Equal(t, "job error", jobsUpdated[0].Job.Errors[0].Error) + + require.True(t, workEnd1Called) + require.True(t, workEnd2Called) + }) + + t.Run("WorkEndErrorSuppression", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + var ( + workEnd1Called bool + workEnd2Called bool + ) + executor.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{ + HookWorkEndFunc(func(ctx context.Context, err error) error { + workEnd1Called = true + require.EqualError(t, err, "job error") + return err + }), + HookWorkEndFunc(func(ctx context.Context, err error) error { + workEnd2Called = true + require.EqualError(t, err, "job error") + return nil // second hook suppresses the error + }), + }) + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { + return errors.New("job error") + }, nil).MakeUnit(bundle.jobRow) + + executor.Execute(ctx) + jobsUpdated := riversharedtest.WaitOrTimeout(t, bundle.updateCh) + require.Len(t, jobsUpdated, 1) + require.Empty(t, jobsUpdated[0].Job.Errors) + + require.True(t, workEnd1Called) + require.True(t, workEnd2Called) + }) +} + +// +// *Func types are copied from the top level River package because they can't be +// accessed from here. +// + +type HookWorkBeginFunc func(ctx context.Context, job *rivertype.JobRow) error + +func (f HookWorkBeginFunc) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + return f(ctx, job) +} + +func (f HookWorkBeginFunc) IsHook() bool { return true } + +type HookWorkEndFunc func(ctx context.Context, err error) error + +func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error { + return f(ctx, err) } +func (f HookWorkEndFunc) IsHook() bool { return true } + type testMiddleware struct { work func(ctx context.Context, job *rivertype.JobRow, next func(context.Context) error) error } diff --git a/rivertype/river_type.go b/rivertype/river_type.go index 6cd29718..9b62c0ff 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -325,6 +325,33 @@ type HookWorkBegin interface { WorkBegin(ctx context.Context, job *JobRow) error } +// HookWorkEnd is an interface to a hook that runs after a job has been worked. +type HookWorkEnd interface { + Hook + + // WorkEnd is invoked after a job has been worked with the error result of + // the worked job. It's invoked after any middleware has already run. + // + // WorkEnd may modify a returned work error or pass it through unchanged. + // Each returned error is passed through to the next hook and the final + // error result is returned from the job executor: + // + // err := e.WorkUnit.Work(ctx) + // for _, hook := range hooks { + // err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err) + // } + // return err + // + // If a hook does not want to modify an error result, it should make sure to + // return whatever error value it received as its argument whether that + // error is nil or not. + // + // Will not receive a common context related to HookWorkBegin because + // WorkBegin doesn't return a context. Middleware should be used for this + // sort of shared context instead. + WorkEnd(ctx context.Context, err error) error +} + // Middleware is an arbitrary interface for a struct which will execute some // arbitrary code at a predefined step in the job lifecycle. //