From 975ce6c7ee623d5f1da7a6661fc46de6fd070cca Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 26 Apr 2025 14:30:06 -0700 Subject: [PATCH] Give `GetListener` a params struct instead of a raw string In #848 I went through and gave all driver functions a params struct instead of whatever they had before. This tends to be more convenient because it's uniform everywhere, and means that we need to change less when a new specific parameter is added henceforth. One I'd intended to do but forgot about was `GetListener` which currently just takes a raw schema string. We haven't cut a release for the change in #848, so it's a good time to change these driver interfaces now, so here we give `GetListener` a params struct as well. This is partly driven by me investigating [1], in which it might be desirable to have a sub-schema leadership namespace so that different River clients configured with different periodic jobs could run inside the same River schema, which would necessitate a new listener parameter. I'm not sure whether this project is a good idea or not yet, so I'll need to investigate a little more. [1] https://github.com/riverqueue/river/issues/742#issuecomment-2819396414 --- client.go | 2 +- internal/leadership/elector_test.go | 2 +- internal/notifier/notifier_test.go | 2 +- .../riverinternaltest/riverdrivertest/riverdrivertest.go | 8 ++++---- producer_test.go | 4 ++-- riverdriver/river_driver_interface.go | 6 +++++- riverdriver/riverdatabasesql/river_database_sql_driver.go | 2 +- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 4 ++-- 8 files changed, 17 insertions(+), 13 deletions(-) diff --git a/client.go b/client.go index 0f485ae3..e5c4c885 100644 --- a/client.go +++ b/client.go @@ -701,7 +701,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // uses listen/notify. Instead, each service polls for changes it's // interested in. e.g. Elector polls to see if leader has expired. if !config.PollOnly { - client.notifier = notifier.New(archetype, driver.GetListener(config.Schema)) + client.notifier = notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: config.Schema})) client.services = append(client.services, client.notifier) } } else { diff --git a/internal/leadership/elector_test.go b/internal/leadership/elector_test.go index d61ba7b2..3aec14c8 100644 --- a/internal/leadership/elector_test.go +++ b/internal/leadership/elector_test.go @@ -93,7 +93,7 @@ func TestElector_WithNotifier(t *testing.T) { archetype = riversharedtest.BaseServiceArchetype(t) ) - notifier := notifier.New(archetype, driver.GetListener(schema)) + notifier := notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})) { require.NoError(t, notifier.Start(ctx)) t.Cleanup(notifier.Stop) diff --git a/internal/notifier/notifier_test.go b/internal/notifier/notifier_test.go index 15d7550c..04d5be82 100644 --- a/internal/notifier/notifier_test.go +++ b/internal/notifier/notifier_test.go @@ -60,7 +60,7 @@ func TestNotifier(t *testing.T) { dbPool = cmp.Or(opts.dbPool, riversharedtest.DBPool(ctx, t)) driver = riverpgxv5.New(dbPool) schema = riverdbtest.TestSchema(ctx, t, driver, nil) - listener = driver.GetListener(schema) + listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema}) ) notifier := New(riversharedtest.BaseServiceArchetype(t), listener) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 057b6f2f..b6772e62 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -3173,7 +3173,7 @@ func setupListener[TTx any](ctx context.Context, t *testing.T, driverWithPool fu var ( driver, schema = driverWithPool(ctx, t) - listener = driver.GetListener(schema) + listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema}) ) return listener, &testListenerBundle[TTx]{ @@ -3266,7 +3266,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool var ( driver, _ = driverWithPool(ctx, t) - listener = driver.GetListener("my_custom_schema") + listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: "my_custom_schema"}) ) require.Equal(t, "my_custom_schema", listener.Schema()) @@ -3286,7 +3286,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool var ( driver, _ = driverWithPool(ctx, t) - listener = driver.GetListener("") + listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: ""}) ) connectListener(ctx, t, listener) @@ -3298,7 +3298,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool var ( driver, _ = driverWithPool(ctx, t) - listener = driver.GetListener("") + listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: ""}) ) connectListener(ctx, t, listener) diff --git a/producer_test.go b/producer_test.go index 0c55a61e..f219d42e 100644 --- a/producer_test.go +++ b/producer_test.go @@ -59,7 +59,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { driver := riverpgxv5.New(dbPool) exec := driver.GetExecutor() schema := riverdbtest.TestSchema(ctx, t, driver, nil) - listener := driver.GetListener(schema) + listener := driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema}) pilot := &riverpilot.StandardPilot{} subscribeChan := make(chan []jobcompleter.CompleterJobUpdated, 100) @@ -223,7 +223,7 @@ func TestProducer_WithNotifier(t *testing.T) { exec = driver.GetExecutor() jobUpdates = make(chan []jobcompleter.CompleterJobUpdated, 10) schema = riverdbtest.TestSchema(ctx, t, driver, nil) - listener = driver.GetListener(schema) + listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema}) pilot = &riverpilot.StandardPilot{} queueName = fmt.Sprintf("test-producer-with-notifier-%05d", randutil.IntBetween(1, 100_000)) ) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 5315b911..f5f49e10 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -53,7 +53,7 @@ type Driver[TTx any] interface { // GetListener gets a listener for purposes of receiving notifications. // // API is not stable. DO NOT USE. - GetListener(schema string) Listener + GetListener(params *GetListenenerParams) Listener // GetMigrationDefaultLines gets default migration lines that should be // applied when using this driver. This is mainly used by riverdbtest to @@ -209,6 +209,10 @@ type ExecutorTx interface { Rollback(ctx context.Context) error } +type GetListenenerParams struct { + Schema string +} + // Listener listens for notifications. In Postgres, this is a database // connection where `LISTEN` has been run. // diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index a0cbec11..9ae00b81 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -54,7 +54,7 @@ func (d *Driver) GetExecutor() riverdriver.Executor { return &Executor{d.dbPool, templateReplaceWrapper{d.dbPool, &d.replacer}, d} } -func (d *Driver) GetListener(schema string) riverdriver.Listener { +func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener { panic(riverdriver.ErrNotImplemented) } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 2808f54a..3523c6e0 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -61,8 +61,8 @@ func (d *Driver) GetExecutor() riverdriver.Executor { return &Executor{templateReplaceWrapper{d.dbPool, &d.replacer}, d} } -func (d *Driver) GetListener(schema string) riverdriver.Listener { - return &Listener{dbPool: d.dbPool, schema: schema} +func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener { + return &Listener{dbPool: d.dbPool, schema: params.Schema} } func (d *Driver) GetMigrationDefaultLines() []string { return []string{riverdriver.MigrationLineMain} }