Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// we'll need to add a config for this.
instanceName := "default"

client.notifier = notifier.New(archetype, driver.GetDBPool().Config().ConnConfig, client.monitor.SetNotifierStatus)
client.notifier = notifier.New(archetype, driver.GetDBPool().Config().ConnConfig, client.monitor.SetNotifierStatus, logger)
var err error
client.elector, err = leadership.NewElector(client.adapter, client.notifier, instanceName, client.id, 5*time.Second, logger)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/maintenance/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func TestScheduler(t *testing.T) {
statusUpdate := func(status componentstatus.Status) {
statusUpdateCh <- status
}
notify := notifier.New(&scheduler.Archetype, dbPool.Config().ConnConfig, statusUpdate)
notify := notifier.New(&scheduler.Archetype, dbPool.Config().ConnConfig, statusUpdate, riverinternaltest.Logger(t))

// Scope in so we can reuse ctx without the cancel embedded.
{
Expand Down
24 changes: 18 additions & 6 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -53,21 +53,23 @@ type Notifier struct {
connConfig *pgx.ConnConfig
notificationBuf chan *pgconn.Notification
statusChangeFunc func(componentstatus.Status)
logger *slog.Logger

mu sync.Mutex
isConnActive bool
subs map[NotificationTopic][]*Subscription
subChangeCh chan *subscriptionChange
}

func New(archetype *baseservice.Archetype, connConfig *pgx.ConnConfig, statusChangeFunc func(componentstatus.Status)) *Notifier {
func New(archetype *baseservice.Archetype, connConfig *pgx.ConnConfig, statusChangeFunc func(componentstatus.Status), logger *slog.Logger) *Notifier {
copiedConfig := connConfig.Copy()
// Rely on an overall statement timeout instead of setting identical context timeouts on every query:
copiedConfig.RuntimeParams["statement_timeout"] = strconv.Itoa(int(statementTimeout.Milliseconds()))
notifier := baseservice.Init(archetype, &Notifier{
connConfig: copiedConfig,
notificationBuf: make(chan *pgconn.Notification, 1000),
statusChangeFunc: statusChangeFunc,
logger: logger.WithGroup("notifier"),

subs: make(map[NotificationTopic][]*Subscription),
subChangeCh: make(chan *subscriptionChange, 1000),
Expand Down Expand Up @@ -135,7 +137,17 @@ func (n *Notifier) getConnAndRun(ctx context.Context) {
if errors.Is(err, context.Canceled) {
return
}
log.Printf("error establishing connection from pool: %v", err)
// Log at a lower verbosity level in case an error is received when the
// context is already done (probably because the client is stopping).
// Example tests can finish before the notifier connects and starts
// listening, and on client stop may produce a connection error that
// would otherwise pollute output and fail the test.
select {
case <-ctx.Done():
n.logger.Info("error establishing connection from pool", "err", err)
default:
n.logger.Error("error establishing connection from pool", "err", err)
}
return
}
defer func() {
Expand Down Expand Up @@ -198,7 +210,7 @@ func (n *Notifier) runOnce(ctx context.Context, conn *pgx.Conn) error {
err := <-errCh
if err != nil && !errors.Is(err, context.Canceled) {
// A non-cancel error means something went wrong with the conn, so we should bail.
log.Printf("error on draining notification wait: %v", err)
n.logger.Error("error on draining notification wait", "err", err)
return err
}
// If we got a context cancellation error, it means we successfully
Expand Down Expand Up @@ -230,7 +242,7 @@ func (n *Notifier) runOnce(ctx context.Context, conn *pgx.Conn) error {
return nil
}
if err != nil {
log.Printf("error from notification wait: %v", err)
n.logger.Error("error from notification wait", "err", err)
return err
}
case subChange := <-n.subChangeCh:
Expand Down Expand Up @@ -300,7 +312,7 @@ func (n *Notifier) handleNotification(conn *pgconn.PgConn, notification *pgconn.
select {
case n.notificationBuf <- notification:
default:
log.Printf("dropping notification due to full buffer: %s", notification.Payload)
n.logger.Warn("dropping notification due to full buffer", "payload", notification.Payload)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNotifierReceivesNotification(t *testing.T) {
statusUpdateCh <- status
}

notifier := New(riverinternaltest.BaseServiceArchetype(t), db.Config().ConnConfig, statusUpdate)
notifier := New(riverinternaltest.BaseServiceArchetype(t), db.Config().ConnConfig, statusUpdate, riverinternaltest.Logger(t))

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand Down
3 changes: 1 addition & 2 deletions internal/testdb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package testdb
import (
"context"
"fmt"
"log"
"log/slog"
"os"
"sync"
Expand Down Expand Up @@ -88,7 +87,7 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) {
nextDBNum := m.getNextDBNum()
dbName := fmt.Sprintf("%s_%d", m.baseConfig.ConnConfig.Database, nextDBNum)

log.Printf("Using test database: %s", dbName)
m.logger.Debug("Using test database", "name", dbName)

newPoolConfig := m.baseConfig.Copy()
newPoolConfig.ConnConfig.Database = dbName
Expand Down
4 changes: 2 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) {
}))

ignoreNotifierStatusUpdates := func(componentstatus.Status) {}
notifier := notifier.New(archetype, dbPool.Config().ConnConfig, ignoreNotifierStatusUpdates)
notifier := notifier.New(archetype, dbPool.Config().ConnConfig, ignoreNotifierStatusUpdates, riverinternaltest.Logger(t))

config := &producerConfig{
ErrorHandler: newTestErrorHandler(),
Expand Down Expand Up @@ -172,7 +172,7 @@ func Test_Producer_Run(t *testing.T) {

workers := NewWorkers()

notifier := notifier.New(archetype, dbPool.Config().ConnConfig, func(componentstatus.Status) {})
notifier := notifier.New(archetype, dbPool.Config().ConnConfig, func(componentstatus.Status) {}, riverinternaltest.Logger(t))

config := &producerConfig{
ErrorHandler: newTestErrorHandler(),
Expand Down