Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// uses listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener(config.Schema))
client.notifier = notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: config.Schema}))
client.services = append(client.services, client.notifier)
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/leadership/elector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestElector_WithNotifier(t *testing.T) {
archetype = riversharedtest.BaseServiceArchetype(t)
)

notifier := notifier.New(archetype, driver.GetListener(schema))
notifier := notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema}))
{
require.NoError(t, notifier.Start(ctx))
t.Cleanup(notifier.Stop)
Expand Down
2 changes: 1 addition & 1 deletion internal/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestNotifier(t *testing.T) {
dbPool = cmp.Or(opts.dbPool, riversharedtest.DBPool(ctx, t))
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
listener = driver.GetListener(schema)
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
)

notifier := New(riversharedtest.BaseServiceArchetype(t), listener)
Expand Down
8 changes: 4 additions & 4 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3173,7 +3173,7 @@ func setupListener[TTx any](ctx context.Context, t *testing.T, driverWithPool fu

var (
driver, schema = driverWithPool(ctx, t)
listener = driver.GetListener(schema)
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
)

return listener, &testListenerBundle[TTx]{
Expand Down Expand Up @@ -3266,7 +3266,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool

var (
driver, _ = driverWithPool(ctx, t)
listener = driver.GetListener("my_custom_schema")
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: "my_custom_schema"})
)

require.Equal(t, "my_custom_schema", listener.Schema())
Expand All @@ -3286,7 +3286,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool

var (
driver, _ = driverWithPool(ctx, t)
listener = driver.GetListener("")
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: ""})
)

connectListener(ctx, t, listener)
Expand All @@ -3298,7 +3298,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool

var (
driver, _ = driverWithPool(ctx, t)
listener = driver.GetListener("")
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: ""})
)

connectListener(ctx, t, listener)
Expand Down
4 changes: 2 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) {
driver := riverpgxv5.New(dbPool)
exec := driver.GetExecutor()
schema := riverdbtest.TestSchema(ctx, t, driver, nil)
listener := driver.GetListener(schema)
listener := driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
pilot := &riverpilot.StandardPilot{}

subscribeChan := make(chan []jobcompleter.CompleterJobUpdated, 100)
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestProducer_WithNotifier(t *testing.T) {
exec = driver.GetExecutor()
jobUpdates = make(chan []jobcompleter.CompleterJobUpdated, 10)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
listener = driver.GetListener(schema)
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
pilot = &riverpilot.StandardPilot{}
queueName = fmt.Sprintf("test-producer-with-notifier-%05d", randutil.IntBetween(1, 100_000))
)
Expand Down
6 changes: 5 additions & 1 deletion riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Driver[TTx any] interface {
// GetListener gets a listener for purposes of receiving notifications.
//
// API is not stable. DO NOT USE.
GetListener(schema string) Listener
GetListener(params *GetListenenerParams) Listener

// GetMigrationDefaultLines gets default migration lines that should be
// applied when using this driver. This is mainly used by riverdbtest to
Expand Down Expand Up @@ -209,6 +209,10 @@ type ExecutorTx interface {
Rollback(ctx context.Context) error
}

type GetListenenerParams struct {
Schema string
}

// Listener listens for notifications. In Postgres, this is a database
// connection where `LISTEN` has been run.
//
Expand Down
2 changes: 1 addition & 1 deletion riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (d *Driver) GetExecutor() riverdriver.Executor {
return &Executor{d.dbPool, templateReplaceWrapper{d.dbPool, &d.replacer}, d}
}

func (d *Driver) GetListener(schema string) riverdriver.Listener {
func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener {
panic(riverdriver.ErrNotImplemented)
}

Expand Down
4 changes: 2 additions & 2 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (d *Driver) GetExecutor() riverdriver.Executor {
return &Executor{templateReplaceWrapper{d.dbPool, &d.replacer}, d}
}

func (d *Driver) GetListener(schema string) riverdriver.Listener {
return &Listener{dbPool: d.dbPool, schema: schema}
func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener {
return &Listener{dbPool: d.dbPool, schema: params.Schema}
}

func (d *Driver) GetMigrationDefaultLines() []string { return []string{riverdriver.MigrationLineMain} }
Expand Down