diff --git a/client.go b/client.go index 919014fe..78d02f15 100644 --- a/client.go +++ b/client.go @@ -448,7 +448,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // we'll need to add a config for this. instanceName := "default" - client.notifier = notifier.New(archetype, driver.GetDBPool().Config().ConnConfig, client.monitor.SetNotifierStatus) + client.notifier = notifier.New(archetype, driver.GetDBPool().Config().ConnConfig, client.monitor.SetNotifierStatus, logger) var err error client.elector, err = leadership.NewElector(client.adapter, client.notifier, instanceName, client.id, 5*time.Second, logger) if err != nil { diff --git a/internal/maintenance/scheduler_test.go b/internal/maintenance/scheduler_test.go index 5b7324a9..40057e33 100644 --- a/internal/maintenance/scheduler_test.go +++ b/internal/maintenance/scheduler_test.go @@ -265,7 +265,7 @@ func TestScheduler(t *testing.T) { statusUpdate := func(status componentstatus.Status) { statusUpdateCh <- status } - notify := notifier.New(&scheduler.Archetype, dbPool.Config().ConnConfig, statusUpdate) + notify := notifier.New(&scheduler.Archetype, dbPool.Config().ConnConfig, statusUpdate, riverinternaltest.Logger(t)) // Scope in so we can reuse ctx without the cancel embedded. { diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index 33242e45..e95603b5 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "log" + "log/slog" "strconv" "sync" "time" @@ -53,6 +53,7 @@ type Notifier struct { connConfig *pgx.ConnConfig notificationBuf chan *pgconn.Notification statusChangeFunc func(componentstatus.Status) + logger *slog.Logger mu sync.Mutex isConnActive bool @@ -60,7 +61,7 @@ type Notifier struct { subChangeCh chan *subscriptionChange } -func New(archetype *baseservice.Archetype, connConfig *pgx.ConnConfig, statusChangeFunc func(componentstatus.Status)) *Notifier { +func New(archetype *baseservice.Archetype, connConfig *pgx.ConnConfig, statusChangeFunc func(componentstatus.Status), logger *slog.Logger) *Notifier { copiedConfig := connConfig.Copy() // Rely on an overall statement timeout instead of setting identical context timeouts on every query: copiedConfig.RuntimeParams["statement_timeout"] = strconv.Itoa(int(statementTimeout.Milliseconds())) @@ -68,6 +69,7 @@ func New(archetype *baseservice.Archetype, connConfig *pgx.ConnConfig, statusCha connConfig: copiedConfig, notificationBuf: make(chan *pgconn.Notification, 1000), statusChangeFunc: statusChangeFunc, + logger: logger.WithGroup("notifier"), subs: make(map[NotificationTopic][]*Subscription), subChangeCh: make(chan *subscriptionChange, 1000), @@ -135,7 +137,17 @@ func (n *Notifier) getConnAndRun(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - log.Printf("error establishing connection from pool: %v", err) + // Log at a lower verbosity level in case an error is received when the + // context is already done (probably because the client is stopping). + // Example tests can finish before the notifier connects and starts + // listening, and on client stop may produce a connection error that + // would otherwise pollute output and fail the test. + select { + case <-ctx.Done(): + n.logger.Info("error establishing connection from pool", "err", err) + default: + n.logger.Error("error establishing connection from pool", "err", err) + } return } defer func() { @@ -198,7 +210,7 @@ func (n *Notifier) runOnce(ctx context.Context, conn *pgx.Conn) error { err := <-errCh if err != nil && !errors.Is(err, context.Canceled) { // A non-cancel error means something went wrong with the conn, so we should bail. - log.Printf("error on draining notification wait: %v", err) + n.logger.Error("error on draining notification wait", "err", err) return err } // If we got a context cancellation error, it means we successfully @@ -230,7 +242,7 @@ func (n *Notifier) runOnce(ctx context.Context, conn *pgx.Conn) error { return nil } if err != nil { - log.Printf("error from notification wait: %v", err) + n.logger.Error("error from notification wait", "err", err) return err } case subChange := <-n.subChangeCh: @@ -300,7 +312,7 @@ func (n *Notifier) handleNotification(conn *pgconn.PgConn, notification *pgconn. select { case n.notificationBuf <- notification: default: - log.Printf("dropping notification due to full buffer: %s", notification.Payload) + n.logger.Warn("dropping notification due to full buffer", "payload", notification.Payload) } } diff --git a/internal/notifier/notifier_test.go b/internal/notifier/notifier_test.go index e51a6a78..ddd1dfc0 100644 --- a/internal/notifier/notifier_test.go +++ b/internal/notifier/notifier_test.go @@ -34,7 +34,7 @@ func TestNotifierReceivesNotification(t *testing.T) { statusUpdateCh <- status } - notifier := New(riverinternaltest.BaseServiceArchetype(t), db.Config().ConnConfig, statusUpdate) + notifier := New(riverinternaltest.BaseServiceArchetype(t), db.Config().ConnConfig, statusUpdate, riverinternaltest.Logger(t)) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() diff --git a/internal/testdb/manager.go b/internal/testdb/manager.go index 3da578d7..c2335021 100644 --- a/internal/testdb/manager.go +++ b/internal/testdb/manager.go @@ -3,7 +3,6 @@ package testdb import ( "context" "fmt" - "log" "log/slog" "os" "sync" @@ -88,7 +87,7 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { nextDBNum := m.getNextDBNum() dbName := fmt.Sprintf("%s_%d", m.baseConfig.ConnConfig.Database, nextDBNum) - log.Printf("Using test database: %s", dbName) + m.logger.Debug("Using test database", "name", dbName) newPoolConfig := m.baseConfig.Copy() newPoolConfig.ConnConfig.Database = dbName diff --git a/producer_test.go b/producer_test.go index 81ea4ab7..6473706c 100644 --- a/producer_test.go +++ b/producer_test.go @@ -71,7 +71,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { })) ignoreNotifierStatusUpdates := func(componentstatus.Status) {} - notifier := notifier.New(archetype, dbPool.Config().ConnConfig, ignoreNotifierStatusUpdates) + notifier := notifier.New(archetype, dbPool.Config().ConnConfig, ignoreNotifierStatusUpdates, riverinternaltest.Logger(t)) config := &producerConfig{ ErrorHandler: newTestErrorHandler(), @@ -172,7 +172,7 @@ func Test_Producer_Run(t *testing.T) { workers := NewWorkers() - notifier := notifier.New(archetype, dbPool.Config().ConnConfig, func(componentstatus.Status) {}) + notifier := notifier.New(archetype, dbPool.Config().ConnConfig, func(componentstatus.Status) {}, riverinternaltest.Logger(t)) config := &producerConfig{ ErrorHandler: newTestErrorHandler(),