diff --git a/common_test.go b/common_test.go deleted file mode 100644 index 8394bccb..00000000 --- a/common_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package river_test - -import ( - "context" - "fmt" - "time" - - "github.com/riverqueue/river" - "github.com/riverqueue/river/rivershared/riversharedtest" - "github.com/riverqueue/river/rivershared/util/sliceutil" - "github.com/riverqueue/river/rivertype" -) - -// -// This file used as a holding place for test helpers for examples so that the -// helpers aren't included in Godoc and keep each example more succinct. -// - -type NoOpArgs struct{} - -func (NoOpArgs) Kind() string { return "no_op" } - -type NoOpWorker struct { - river.WorkerDefaults[NoOpArgs] -} - -func (w *NoOpWorker) Work(ctx context.Context, job *river.Job[NoOpArgs]) error { - fmt.Printf("NoOpWorker.Work ran\n") - return nil -} - -// Wait on the given subscription channel for numJobs. Times out with a panic if -// jobs take too long to be received. -func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) []*rivertype.JobRow { //nolint:unparam - var ( - timeout = riversharedtest.WaitTimeout() - deadline = time.Now().Add(timeout) - events = make([]*river.Event, 0, numJobs) - ) - - for { - select { - case event := <-subscribeChan: - events = append(events, event) - - if len(events) >= numJobs { - return sliceutil.Map(events, func(e *river.Event) *rivertype.JobRow { return e.Job }) - } - - case <-time.After(time.Until(deadline)): - panic(fmt.Sprintf("waitForNJobs timed out after waiting %s (received %d job(s), wanted %d)", - timeout, len(events), numJobs)) - } - } -} diff --git a/example_batch_insert_test.go b/example/example_batch_insert/example_batch_insert_test.go similarity index 93% rename from example_batch_insert_test.go rename to example/example_batch_insert/example_batch_insert_test.go index 0146240e..f5e7bc73 100644 --- a/example_batch_insert_test.go +++ b/example/example_batch_insert/example_batch_insert_test.go @@ -1,4 +1,4 @@ -package river_test +package example_batch_insert import ( "context" @@ -77,7 +77,8 @@ func Example_batchInsert() { } fmt.Printf("Inserted %d jobs\n", len(results)) - waitForNJobs(subscribeChan, 5) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 5) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_client_from_context_test.go b/example/example_client_from_context/example_client_from_context_test.go similarity index 93% rename from example_client_from_context_test.go rename to example/example_client_from_context/example_client_from_context_test.go index e4de2045..d5102eba 100644 --- a/example_client_from_context_test.go +++ b/example/example_client_from_context/example_client_from_context_test.go @@ -1,4 +1,4 @@ -package river_test +package example_client_from_context import ( "context" @@ -76,7 +76,8 @@ func ExampleClientFromContext_pgx() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_client_from_context_database_sql_test.go b/example/example_client_from_context_database_sql/example_client_from_context_database_sql_test.go similarity index 93% rename from example_client_from_context_database_sql_test.go rename to example/example_client_from_context_database_sql/example_client_from_context_database_sql_test.go index 3cca0977..c0c469a8 100644 --- a/example_client_from_context_database_sql_test.go +++ b/example/example_client_from_context_database_sql/example_client_from_context_database_sql_test.go @@ -1,4 +1,4 @@ -package river_test +package example_client_from_context_database_sql import ( "context" @@ -80,7 +80,8 @@ func ExampleClientFromContext_databaseSQL() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_complete_job_within_tx_test.go b/example/example_complete_job_within_tx/example_complete_job_within_tx_test.go similarity index 94% rename from example_complete_job_within_tx_test.go rename to example/example_complete_job_within_tx/example_complete_job_within_tx_test.go index 093280de..69f9f709 100644 --- a/example_complete_job_within_tx_test.go +++ b/example/example_complete_job_within_tx/example_complete_job_within_tx_test.go @@ -1,4 +1,4 @@ -package river_test +package example_complete_job_within_tx import ( "context" @@ -94,7 +94,8 @@ func Example_completeJobWithinTx() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_cron_job_test.go b/example/example_cron_job/example_cron_job_test.go similarity index 94% rename from example_cron_job_test.go rename to example/example_cron_job/example_cron_job_test.go index fd62102a..d72ef9f6 100644 --- a/example_cron_job_test.go +++ b/example/example_cron_job/example_cron_job_test.go @@ -1,4 +1,4 @@ -package river_test +package example_cron_job import ( "context" @@ -83,7 +83,8 @@ func Example_cronJob() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_custom_insert_opts_test.go b/example/example_custom_insert_opts/example_custom_insert_opts_test.go similarity index 95% rename from example_custom_insert_opts_test.go rename to example/example_custom_insert_opts/example_custom_insert_opts_test.go index 20151a60..b4592d3b 100644 --- a/example_custom_insert_opts_test.go +++ b/example/example_custom_insert_opts/example_custom_insert_opts_test.go @@ -1,4 +1,4 @@ -package river_test +package example_custom_insert_opts import ( "context" @@ -106,7 +106,8 @@ func Example_customInsertOpts() { panic(err) } - waitForNJobs(subscribeChan, 2) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 2) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_error_handler_test.go b/example/example_error_handler/example_error_handler_test.go similarity index 91% rename from example_error_handler_test.go rename to example/example_error_handler/example_error_handler_test.go index b9a27b34..79929c14 100644 --- a/example_error_handler_test.go +++ b/example/example_error_handler/example_error_handler_test.go @@ -1,4 +1,4 @@ -package river_test +package example_error_handler import ( "context" @@ -100,13 +100,15 @@ func Example_errorHandler() { // Wait for the first job before inserting another to guarantee test output // is ordered correctly. - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if _, err = riverClient.Insert(ctx, ErroringArgs{ShouldPanic: true}, nil); err != nil { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_global_hooks_test.go b/example/example_global_hooks/example_global_hooks_test.go similarity index 95% rename from example_global_hooks_test.go rename to example/example_global_hooks/example_global_hooks_test.go index 8470d995..86c26138 100644 --- a/example_global_hooks_test.go +++ b/example/example_global_hooks/example_global_hooks_test.go @@ -1,4 +1,4 @@ -package river_test +package example_global_hooks import ( "context" @@ -98,7 +98,8 @@ func Example_globalHooks() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example/example_global_hooks/no_op_worker.go b/example/example_global_hooks/no_op_worker.go new file mode 100644 index 00000000..c61968a1 --- /dev/null +++ b/example/example_global_hooks/no_op_worker.go @@ -0,0 +1,26 @@ +package example_global_hooks + +import ( + "context" + "fmt" + + "github.com/riverqueue/river" +) + +// +// This file used as a holding place for test helpers for examples so that the +// helpers aren't included in Godoc and keep each example more succinct. +// + +type NoOpArgs struct{} + +func (NoOpArgs) Kind() string { return "no_op" } + +type NoOpWorker struct { + river.WorkerDefaults[NoOpArgs] +} + +func (w *NoOpWorker) Work(ctx context.Context, job *river.Job[NoOpArgs]) error { + fmt.Printf("NoOpWorker.Work ran\n") + return nil +} diff --git a/example_global_middleware_test.go b/example/example_global_middleware/example_global_middleware_test.go similarity index 95% rename from example_global_middleware_test.go rename to example/example_global_middleware/example_global_middleware_test.go index 3f8f543c..16dedab0 100644 --- a/example_global_middleware_test.go +++ b/example/example_global_middleware/example_global_middleware_test.go @@ -1,4 +1,4 @@ -package river_test +package example_global_middleware import ( "context" @@ -98,7 +98,8 @@ func Example_globalMiddleware() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example/example_global_middleware/no_op_worker.go b/example/example_global_middleware/no_op_worker.go new file mode 100644 index 00000000..66b61b0a --- /dev/null +++ b/example/example_global_middleware/no_op_worker.go @@ -0,0 +1,26 @@ +package example_global_middleware + +import ( + "context" + "fmt" + + "github.com/riverqueue/river" +) + +// +// This file used as a holding place for test helpers for examples so that the +// helpers aren't included in Godoc and keep each example more succinct. +// + +type NoOpArgs struct{} + +func (NoOpArgs) Kind() string { return "no_op" } + +type NoOpWorker struct { + river.WorkerDefaults[NoOpArgs] +} + +func (w *NoOpWorker) Work(ctx context.Context, job *river.Job[NoOpArgs]) error { + fmt.Printf("NoOpWorker.Work ran\n") + return nil +} diff --git a/example_graceful_shutdown_test.go b/example/example_graceful_shutdown/example_graceful_shutdown_test.go similarity index 99% rename from example_graceful_shutdown_test.go rename to example/example_graceful_shutdown/example_graceful_shutdown_test.go index f9e77983..a59c9f95 100644 --- a/example_graceful_shutdown_test.go +++ b/example/example_graceful_shutdown/example_graceful_shutdown_test.go @@ -1,4 +1,4 @@ -package river_test +package example_graceful_shutdown import ( "context" diff --git a/example_insert_and_work_test.go b/example/example_insert_and_work/example_insert_and_work_test.go similarity index 93% rename from example_insert_and_work_test.go rename to example/example_insert_and_work/example_insert_and_work_test.go index 904142ef..d9c37bde 100644 --- a/example_insert_and_work_test.go +++ b/example/example_insert_and_work/example_insert_and_work_test.go @@ -1,4 +1,4 @@ -package river_test +package example_insert_and_work import ( "context" @@ -92,7 +92,8 @@ func Example_insertAndWork() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_job_args_hooks_test.go b/example/example_job_args_hooks/example_job_args_hooks_test.go similarity index 89% rename from example_job_args_hooks_test.go rename to example/example_job_args_hooks/example_job_args_hooks_test.go index 00251ca2..15fe6ee0 100644 --- a/example_job_args_hooks_test.go +++ b/example/example_job_args_hooks/example_job_args_hooks_test.go @@ -1,4 +1,4 @@ -package river_test +package example_job_args_hooks import ( "context" @@ -71,10 +71,10 @@ func (JobWithHooksWorkBeginHook) WorkBegin(ctx context.Context, job *rivertype.J // test suite to make sure that your hooks are complying to the specific // interface hooks that you expected them to be. var ( - _ rivertype.HookInsertBegin = &BothInsertAndWorkBeginHook{} - _ rivertype.HookWorkBegin = &BothInsertAndWorkBeginHook{} - _ rivertype.HookInsertBegin = &InsertBeginHook{} - _ rivertype.HookWorkBegin = &WorkBeginHook{} + _ rivertype.HookInsertBegin = &JobWithHooksBothInsertAndWorkBeginHook{} + _ rivertype.HookWorkBegin = &JobWithHooksBothInsertAndWorkBeginHook{} + _ rivertype.HookInsertBegin = &JobWithHooksInsertBeginHook{} + _ rivertype.HookWorkBegin = &JobWithHooksWorkBeginHook{} ) // Example_jobArgsHooks demonstrates the use of hooks to modify River behavior. @@ -116,7 +116,8 @@ func Example_jobArgsHooks() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_job_cancel_test.go b/example/example_job_cancel/example_job_cancel_test.go similarity index 91% rename from example_job_cancel_test.go rename to example/example_job_cancel/example_job_cancel_test.go index 05474d58..b0dd7060 100644 --- a/example_job_cancel_test.go +++ b/example/example_job_cancel/example_job_cancel_test.go @@ -1,4 +1,4 @@ -package river_test +package example_job_cancel import ( "context" @@ -36,7 +36,7 @@ func (w *CancellingWorker) Work(ctx context.Context, job *river.Job[CancellingAr // Example_jobCancel demonstrates how to permanently cancel a job from within // Work using JobCancel. -func Example_jobCancel() { //nolint:dupl +func Example_jobCancel() { ctx := context.Background() dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) @@ -71,7 +71,8 @@ func Example_jobCancel() { //nolint:dupl if _, err = riverClient.Insert(ctx, CancellingArgs{ShouldCancel: true}, nil); err != nil { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_job_cancel_from_client_test.go b/example/example_job_cancel_from_client/example_job_cancel_from_client_test.go similarity index 89% rename from example_job_cancel_from_client_test.go rename to example/example_job_cancel_from_client/example_job_cancel_from_client_test.go index 2a12c671..201c59f4 100644 --- a/example_job_cancel_from_client_test.go +++ b/example/example_job_cancel_from_client/example_job_cancel_from_client_test.go @@ -1,4 +1,4 @@ -package river_test +package example_job_cancel_from_client import ( "context" @@ -21,11 +21,11 @@ type SleepingArgs struct{} func (args SleepingArgs) Kind() string { return "SleepingWorker" } type SleepingWorker struct { - river.WorkerDefaults[CancellingArgs] + river.WorkerDefaults[SleepingArgs] jobChan chan int64 } -func (w *SleepingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs]) error { +func (w *SleepingWorker) Work(ctx context.Context, job *river.Job[SleepingArgs]) error { w.jobChan <- job.ID select { case <-ctx.Done(): @@ -71,7 +71,7 @@ func Example_jobCancelFromClient() { if err := riverClient.Start(ctx); err != nil { panic(err) } - insertRes, err := riverClient.Insert(ctx, CancellingArgs{ShouldCancel: true}, nil) + insertRes, err := riverClient.Insert(ctx, SleepingArgs{}, nil) if err != nil { panic(err) } @@ -90,7 +90,8 @@ func Example_jobCancelFromClient() { if _, err = riverClient.JobCancel(ctx, insertRes.Job.ID); err != nil { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_job_snooze_test.go b/example/example_job_snooze/example_job_snooze_test.go similarity index 92% rename from example_job_snooze_test.go rename to example/example_job_snooze/example_job_snooze_test.go index f14537dc..edd3f6f7 100644 --- a/example_job_snooze_test.go +++ b/example/example_job_snooze/example_job_snooze_test.go @@ -1,4 +1,4 @@ -package river_test +package example_job_snooze import ( "context" @@ -38,7 +38,7 @@ func (w *SnoozingWorker) Work(ctx context.Context, job *river.Job[SnoozingArgs]) // JobSnooze. The job will be run again after 5 minutes and the snooze attempt // will decrement the job's attempt count, ensuring that one can snooze as many // times as desired without being impacted by the max attempts. -func Example_jobSnooze() { //nolint:dupl +func Example_jobSnooze() { ctx := context.Background() dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) @@ -74,7 +74,8 @@ func Example_jobSnooze() { //nolint:dupl if _, err = riverClient.Insert(ctx, SnoozingArgs{ShouldSnooze: true}, nil); err != nil { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_periodic_job_test.go b/example/example_periodic_job/example_periodic_job_test.go similarity index 94% rename from example_periodic_job_test.go rename to example/example_periodic_job/example_periodic_job_test.go index 3876170e..8e91f6ad 100644 --- a/example_periodic_job_test.go +++ b/example/example_periodic_job/example_periodic_job_test.go @@ -1,4 +1,4 @@ -package river_test +package example_periodic_job import ( "context" @@ -76,7 +76,8 @@ func Example_periodicJob() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) // Periodic jobs can also be configured dynamically after a client has // already started. Added jobs are scheduled for run immediately. diff --git a/example_queue_pause_test.go b/example/example_queue_pause/example_queue_pause_test.go similarity index 99% rename from example_queue_pause_test.go rename to example/example_queue_pause/example_queue_pause_test.go index 22bf216b..0db8470f 100644 --- a/example_queue_pause_test.go +++ b/example/example_queue_pause/example_queue_pause_test.go @@ -1,4 +1,4 @@ -package river_test +package example_queue_pause import ( "context" diff --git a/example_scheduled_job_test.go b/example/example_scheduled_job/example_scheduled_job_test.go similarity index 98% rename from example_scheduled_job_test.go rename to example/example_scheduled_job/example_scheduled_job_test.go index ec43ba0b..52dc5b79 100644 --- a/example_scheduled_job_test.go +++ b/example/example_scheduled_job/example_scheduled_job_test.go @@ -1,4 +1,4 @@ -package river_test +package example_scheduled_job import ( "context" diff --git a/example_subscription_test.go b/example/example_subscription/example_subscription_test.go similarity index 99% rename from example_subscription_test.go rename to example/example_subscription/example_subscription_test.go index 8e94436d..1cde6d43 100644 --- a/example_subscription_test.go +++ b/example/example_subscription/example_subscription_test.go @@ -1,4 +1,4 @@ -package river_test +package example_subscription import ( "context" diff --git a/example_unique_job_test.go b/example/example_unique_job/example_unique_job_test.go similarity index 92% rename from example_unique_job_test.go rename to example/example_unique_job/example_unique_job_test.go index b454b7dd..965d2fd4 100644 --- a/example_unique_job_test.go +++ b/example/example_unique_job/example_unique_job_test.go @@ -1,4 +1,4 @@ -package river_test +package example_unique_job import ( "context" @@ -112,7 +112,8 @@ func Example_uniqueJob() { // Cheat a little by waiting for the first job to come back so we can // guarantee that this example's output comes out in order. - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) // Because the job is unique ByArgs, another job for account 2 is allowed. _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 2}, nil) @@ -120,7 +121,8 @@ func Example_uniqueJob() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/example_work_func_test.go b/example/example_work_func/example_work_func_test.go similarity index 92% rename from example_work_func_test.go rename to example/example_work_func/example_work_func_test.go index 5bd4b81c..67a2f3cd 100644 --- a/example_work_func_test.go +++ b/example/example_work_func/example_work_func_test.go @@ -1,4 +1,4 @@ -package river_test +package example_work_func import ( "context" @@ -67,7 +67,8 @@ func Example_workFunc() { panic(err) } - waitForNJobs(subscribeChan, 1) + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) diff --git a/riverdbtest/riverdbtest.go b/riverdbtest/riverdbtest.go index efd6fdb9..f12004a8 100644 --- a/riverdbtest/riverdbtest.go +++ b/riverdbtest/riverdbtest.go @@ -10,6 +10,7 @@ import ( "context" "errors" "fmt" + "math/rand/v2" "runtime" "slices" "strings" @@ -32,14 +33,16 @@ import ( const schemaDateFormat = "2006_01_02t15_04_05" // everything here needs to be lowercase because Postgres forces schema names to lowercase var ( - genSchemaBase sync.Once //nolint:gochecknoglobals - idleSchemas = make(map[string][]string) //nolint:gochecknoglobals - idleSchemasMu sync.Mutex //nolint:gochecknoglobals - initialCleanup sync.Once //nolint:gochecknoglobals - nextSchemaNum atomic.Int32 //nolint:gochecknoglobals - packageName string //nolint:gochecknoglobals - schemaBaseName string //nolint:gochecknoglobals - stats struct { //nolint:gochecknoglobals + genSchemaBase sync.Once //nolint:gochecknoglobals + idleSchemas = make(map[string][]string) //nolint:gochecknoglobals + idleSchemasMu sync.Mutex //nolint:gochecknoglobals + initialCleanup sync.Once //nolint:gochecknoglobals + nextSchemaNum atomic.Int32 //nolint:gochecknoglobals + packageName string //nolint:gochecknoglobals + schemaBaseName string //nolint:gochecknoglobals + schemaExpireHorizon string //nolint:gochecknoglobals + schemaRandSuffix string //nolint:gochecknoglobals + stats struct { //nolint:gochecknoglobals numGenerated atomic.Int32 numReused atomic.Int32 } @@ -129,10 +132,22 @@ func TestSchema[TTx any](ctx context.Context, tb testutil.TestingTB, driver rive // to trim "_schema_" to an abbreviation or shorten "river_leadership". const maxLength = 63 - len("_2025_04_20t16_00_20_schema_01.river_leadership") - 1 if len(packageName) > maxLength { - packageName = packageName[0:maxLength] + // Where truncation is necessary, we also end up with a high + // possibility of contention between schema names. For example, + // `example_job_cancel` and `example_job_cancel_from_client` resolve + // to exactly the same thing and may run in parallel. + // + // Correct this so that if truncation was necessary, generate a + // random suffix. This is added to the schema name separately so + // that cleanup still works correctly (if a random suffix was + // included in cleanup names, it wouldn't match anything). + const numRandChars = 5 + packageName = packageName[0 : maxLength-numRandChars-1] + schemaRandSuffix = randBase62(numRandChars) + "_" } schemaBaseName = packageName + "_" + time.Now().Format(schemaDateFormat) + "_schema_" + schemaExpireHorizon = packageName + "_" + time.Now().Add(-1*time.Minute).Format(schemaDateFormat) + "_schema_" }) exec := driver.GetExecutor() @@ -148,7 +163,7 @@ func TestSchema[TTx any](ctx context.Context, tb testutil.TestingTB, driver rive // end up contending with them as they also try to clean their old // schemas. expiredSchemas, err := driver.GetExecutor().SchemaGetExpired(ctx, &riverdriver.SchemaGetExpiredParams{ - BeforeName: schemaBaseName, + BeforeName: schemaExpireHorizon, Prefix: packageName + "_%", }) require.NoError(tb, err) @@ -228,7 +243,12 @@ func TestSchema[TTx any](ctx context.Context, tb testutil.TestingTB, driver rive } // e.g. river_2025_04_14t22_13_58_schema_10 - schema := schemaBaseName + fmt.Sprintf("%02d", nextSchemaNum.Add(1)) + // + // Or where a package name had to truncated so a random suffix was + // generated: + // + // e.g. river_2025_04_14t22_13_58_schema_kwo78x_10 + schema := schemaBaseName + schemaRandSuffix + fmt.Sprintf("%02d", nextSchemaNum.Add(1)) _, err := exec.Exec(ctx, "CREATE SCHEMA "+schema) require.NoError(tb, err) @@ -305,6 +325,16 @@ func packageFromFunc(funcName string) string { return packageName } +var base62Runes = []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789") //nolint:gochecknoglobals + +func randBase62(numRandChars int) string { + randChars := make([]rune, numRandChars) + for i := range numRandChars { + randChars[i] = base62Runes[rand.IntN(len(base62Runes))] + } + return string(randChars) +} + // TestTx starts a test transaction that's rolled back automatically as the test // case is cleaning itself up. // diff --git a/riverlog/common_test.go b/riverlog/common_test.go deleted file mode 100644 index c1b755c2..00000000 --- a/riverlog/common_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package riverlog_test - -import ( - "fmt" - "time" - - "github.com/riverqueue/river" - "github.com/riverqueue/river/rivershared/riversharedtest" - "github.com/riverqueue/river/rivershared/util/sliceutil" - "github.com/riverqueue/river/rivertype" -) - -// -// This file used as a holding place for test helpers for examples so that the -// helpers aren't included in Godoc and keep each example more succinct. -// - -// Wait on the given subscription channel for numJobs. Times out with a panic if -// jobs take too long to be received. -func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) []*rivertype.JobRow { - var ( - timeout = riversharedtest.WaitTimeout() - deadline = time.Now().Add(timeout) - events = make([]*river.Event, 0, numJobs) - ) - - for { - select { - case event := <-subscribeChan: - events = append(events, event) - - if len(events) >= numJobs { - return sliceutil.Map(events, func(e *river.Event) *rivertype.JobRow { return e.Job }) - } - - case <-time.After(time.Until(deadline)): - panic(fmt.Sprintf("waitForNJobs timed out after waiting %s (received %d job(s), wanted %d)", - timeout, len(events), numJobs)) - } - } -} diff --git a/riverlog/example_middleware_test.go b/riverlog/example_middleware_test.go index ebb809e7..da76f6b4 100644 --- a/riverlog/example_middleware_test.go +++ b/riverlog/example_middleware_test.go @@ -90,9 +90,9 @@ func Example_middleware() { } // Wait for job to complete, extract log data out of metadata, and print it. - for _, job := range waitForNJobs(subscribeChan, 1) { + for _, event := range riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) { var metadataWithLog metadataWithLog - if err := json.Unmarshal(job.Metadata, &metadataWithLog); err != nil { + if err := json.Unmarshal(event.Job.Metadata, &metadataWithLog); err != nil { panic(err) } for _, logAttempt := range metadataWithLog.RiverLog { diff --git a/rivershared/riversharedtest/riversharedtest.go b/rivershared/riversharedtest/riversharedtest.go index e34d93c7..eff77b19 100644 --- a/rivershared/riversharedtest/riversharedtest.go +++ b/rivershared/riversharedtest/riversharedtest.go @@ -199,7 +199,7 @@ func WaitOrTimeout[T any](tb testing.TB, waitChan <-chan T) T { // through, and returns it if they do, but times out after a reasonable amount // of time. Useful to guarantee that test cases don't hang forever, even in the // event of something wrong. -func WaitOrTimeoutN[T any](tb testing.TB, waitChan <-chan T, numValues int) []T { +func WaitOrTimeoutN[T any](tb testutil.TestingTB, waitChan <-chan T, numValues int) []T { tb.Helper() var (