From 28de9c5b4ff9f84ba8d81472bfaa4bd36880a7e6 Mon Sep 17 00:00:00 2001 From: Martin Englund Date: Sat, 30 Dec 2023 22:23:56 -0800 Subject: [PATCH 1/4] replace log with slog --- client.go | 2 +- internal/notifier/notifier.go | 14 ++++++++------ internal/testdb/manager.go | 3 +-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index 919014fe..78d02f15 100644 --- a/client.go +++ b/client.go @@ -448,7 +448,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // we'll need to add a config for this. instanceName := "default" - client.notifier = notifier.New(archetype, driver.GetDBPool().Config().ConnConfig, client.monitor.SetNotifierStatus) + client.notifier = notifier.New(archetype, driver.GetDBPool().Config().ConnConfig, client.monitor.SetNotifierStatus, logger) var err error client.elector, err = leadership.NewElector(client.adapter, client.notifier, instanceName, client.id, 5*time.Second, logger) if err != nil { diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index 33242e45..a45902fd 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "log" + "log/slog" "strconv" "sync" "time" @@ -53,6 +53,7 @@ type Notifier struct { connConfig *pgx.ConnConfig notificationBuf chan *pgconn.Notification statusChangeFunc func(componentstatus.Status) + logger *slog.Logger mu sync.Mutex isConnActive bool @@ -60,7 +61,7 @@ type Notifier struct { subChangeCh chan *subscriptionChange } -func New(archetype *baseservice.Archetype, connConfig *pgx.ConnConfig, statusChangeFunc func(componentstatus.Status)) *Notifier { +func New(archetype *baseservice.Archetype, connConfig *pgx.ConnConfig, statusChangeFunc func(componentstatus.Status), logger *slog.Logger) *Notifier { copiedConfig := connConfig.Copy() // Rely on an overall statement timeout instead of setting identical context timeouts on every query: copiedConfig.RuntimeParams["statement_timeout"] = strconv.Itoa(int(statementTimeout.Milliseconds())) @@ -68,6 +69,7 @@ func New(archetype *baseservice.Archetype, connConfig *pgx.ConnConfig, statusCha connConfig: copiedConfig, notificationBuf: make(chan *pgconn.Notification, 1000), statusChangeFunc: statusChangeFunc, + logger: logger.WithGroup("notifier"), subs: make(map[NotificationTopic][]*Subscription), subChangeCh: make(chan *subscriptionChange, 1000), @@ -135,7 +137,7 @@ func (n *Notifier) getConnAndRun(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - log.Printf("error establishing connection from pool: %v", err) + slog.Error("error establishing connection from pool", "err", err) return } defer func() { @@ -198,7 +200,7 @@ func (n *Notifier) runOnce(ctx context.Context, conn *pgx.Conn) error { err := <-errCh if err != nil && !errors.Is(err, context.Canceled) { // A non-cancel error means something went wrong with the conn, so we should bail. - log.Printf("error on draining notification wait: %v", err) + slog.Error("error on draining notification wait", "err", err) return err } // If we got a context cancellation error, it means we successfully @@ -230,7 +232,7 @@ func (n *Notifier) runOnce(ctx context.Context, conn *pgx.Conn) error { return nil } if err != nil { - log.Printf("error from notification wait: %v", err) + slog.Error("error from notification wait", "err", err) return err } case subChange := <-n.subChangeCh: @@ -300,7 +302,7 @@ func (n *Notifier) handleNotification(conn *pgconn.PgConn, notification *pgconn. select { case n.notificationBuf <- notification: default: - log.Printf("dropping notification due to full buffer: %s", notification.Payload) + slog.Warn("dropping notification due to full buffer", "payload", notification.Payload) } } diff --git a/internal/testdb/manager.go b/internal/testdb/manager.go index 3da578d7..44c29e7b 100644 --- a/internal/testdb/manager.go +++ b/internal/testdb/manager.go @@ -3,7 +3,6 @@ package testdb import ( "context" "fmt" - "log" "log/slog" "os" "sync" @@ -88,7 +87,7 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { nextDBNum := m.getNextDBNum() dbName := fmt.Sprintf("%s_%d", m.baseConfig.ConnConfig.Database, nextDBNum) - log.Printf("Using test database: %s", dbName) + slog.Debug("Using test database", "name", dbName) newPoolConfig := m.baseConfig.Copy() newPoolConfig.ConnConfig.Database = dbName From d73452a71fb1192290d17e58d5c2cf4b4d10e16a Mon Sep 17 00:00:00 2001 From: Martin Englund Date: Sun, 31 Dec 2023 00:13:15 -0800 Subject: [PATCH 2/4] use struct logger --- internal/maintenance/scheduler_test.go | 4 +++- internal/notifier/notifier.go | 8 ++++---- internal/notifier/notifier_test.go | 4 +++- internal/testdb/manager.go | 2 +- producer_test.go | 6 ++++-- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/internal/maintenance/scheduler_test.go b/internal/maintenance/scheduler_test.go index 5b7324a9..fb1c8f05 100644 --- a/internal/maintenance/scheduler_test.go +++ b/internal/maintenance/scheduler_test.go @@ -3,6 +3,7 @@ package maintenance import ( "context" "encoding/json" + "log/slog" "sort" "testing" "time" @@ -16,6 +17,7 @@ import ( "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/util/ptrutil" + "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/internal/util/valutil" ) @@ -265,7 +267,7 @@ func TestScheduler(t *testing.T) { statusUpdate := func(status componentstatus.Status) { statusUpdateCh <- status } - notify := notifier.New(&scheduler.Archetype, dbPool.Config().ConnConfig, statusUpdate) + notify := notifier.New(&scheduler.Archetype, dbPool.Config().ConnConfig, statusUpdate, slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})) // Scope in so we can reuse ctx without the cancel embedded. { diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index a45902fd..8e1373b9 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -137,7 +137,7 @@ func (n *Notifier) getConnAndRun(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - slog.Error("error establishing connection from pool", "err", err) + n.logger.Error("error establishing connection from pool", "err", err) return } defer func() { @@ -200,7 +200,7 @@ func (n *Notifier) runOnce(ctx context.Context, conn *pgx.Conn) error { err := <-errCh if err != nil && !errors.Is(err, context.Canceled) { // A non-cancel error means something went wrong with the conn, so we should bail. - slog.Error("error on draining notification wait", "err", err) + n.logger.Error("error on draining notification wait", "err", err) return err } // If we got a context cancellation error, it means we successfully @@ -232,7 +232,7 @@ func (n *Notifier) runOnce(ctx context.Context, conn *pgx.Conn) error { return nil } if err != nil { - slog.Error("error from notification wait", "err", err) + n.logger.Error("error from notification wait", "err", err) return err } case subChange := <-n.subChangeCh: @@ -302,7 +302,7 @@ func (n *Notifier) handleNotification(conn *pgconn.PgConn, notification *pgconn. select { case n.notificationBuf <- notification: default: - slog.Warn("dropping notification due to full buffer", "payload", notification.Payload) + n.logger.Warn("dropping notification due to full buffer", "payload", notification.Payload) } } diff --git a/internal/notifier/notifier_test.go b/internal/notifier/notifier_test.go index e51a6a78..565c430c 100644 --- a/internal/notifier/notifier_test.go +++ b/internal/notifier/notifier_test.go @@ -2,6 +2,7 @@ package notifier import ( "context" + "log/slog" "testing" "time" @@ -10,6 +11,7 @@ import ( "github.com/riverqueue/river/internal/componentstatus" "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/internal/util/slogutil" ) func expectReceiveStatus(t *testing.T, statusCh <-chan componentstatus.Status, expected componentstatus.Status) { @@ -34,7 +36,7 @@ func TestNotifierReceivesNotification(t *testing.T) { statusUpdateCh <- status } - notifier := New(riverinternaltest.BaseServiceArchetype(t), db.Config().ConnConfig, statusUpdate) + notifier := New(riverinternaltest.BaseServiceArchetype(t), db.Config().ConnConfig, statusUpdate, slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() diff --git a/internal/testdb/manager.go b/internal/testdb/manager.go index 44c29e7b..c2335021 100644 --- a/internal/testdb/manager.go +++ b/internal/testdb/manager.go @@ -87,7 +87,7 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { nextDBNum := m.getNextDBNum() dbName := fmt.Sprintf("%s_%d", m.baseConfig.ConnConfig.Database, nextDBNum) - slog.Debug("Using test database", "name", dbName) + m.logger.Debug("Using test database", "name", dbName) newPoolConfig := m.baseConfig.Copy() newPoolConfig.ConnConfig.Database = dbName diff --git a/producer_test.go b/producer_test.go index 81ea4ab7..d0b30ae6 100644 --- a/producer_test.go +++ b/producer_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "slices" "sync" "testing" @@ -19,6 +20,7 @@ import ( "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/internal/util/slogutil" ) func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { @@ -71,7 +73,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { })) ignoreNotifierStatusUpdates := func(componentstatus.Status) {} - notifier := notifier.New(archetype, dbPool.Config().ConnConfig, ignoreNotifierStatusUpdates) + notifier := notifier.New(archetype, dbPool.Config().ConnConfig, ignoreNotifierStatusUpdates, slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})) config := &producerConfig{ ErrorHandler: newTestErrorHandler(), @@ -172,7 +174,7 @@ func Test_Producer_Run(t *testing.T) { workers := NewWorkers() - notifier := notifier.New(archetype, dbPool.Config().ConnConfig, func(componentstatus.Status) {}) + notifier := notifier.New(archetype, dbPool.Config().ConnConfig, func(componentstatus.Status) {}, slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})) config := &producerConfig{ ErrorHandler: newTestErrorHandler(), From f4595dfa711a1326d8e63b6739e718db9fb9e6a6 Mon Sep 17 00:00:00 2001 From: Martin Englund Date: Mon, 1 Jan 2024 17:04:22 -0800 Subject: [PATCH 3/4] send test logs to the right place --- internal/maintenance/scheduler_test.go | 4 +--- internal/notifier/notifier.go | 12 +++++++++++- internal/notifier/notifier_test.go | 4 +--- producer_test.go | 2 +- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/maintenance/scheduler_test.go b/internal/maintenance/scheduler_test.go index fb1c8f05..40057e33 100644 --- a/internal/maintenance/scheduler_test.go +++ b/internal/maintenance/scheduler_test.go @@ -3,7 +3,6 @@ package maintenance import ( "context" "encoding/json" - "log/slog" "sort" "testing" "time" @@ -17,7 +16,6 @@ import ( "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/util/ptrutil" - "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/internal/util/valutil" ) @@ -267,7 +265,7 @@ func TestScheduler(t *testing.T) { statusUpdate := func(status componentstatus.Status) { statusUpdateCh <- status } - notify := notifier.New(&scheduler.Archetype, dbPool.Config().ConnConfig, statusUpdate, slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})) + notify := notifier.New(&scheduler.Archetype, dbPool.Config().ConnConfig, statusUpdate, riverinternaltest.Logger(t)) // Scope in so we can reuse ctx without the cancel embedded. { diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index 8e1373b9..e95603b5 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -137,7 +137,17 @@ func (n *Notifier) getConnAndRun(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - n.logger.Error("error establishing connection from pool", "err", err) + // Log at a lower verbosity level in case an error is received when the + // context is already done (probably because the client is stopping). + // Example tests can finish before the notifier connects and starts + // listening, and on client stop may produce a connection error that + // would otherwise pollute output and fail the test. + select { + case <-ctx.Done(): + n.logger.Info("error establishing connection from pool", "err", err) + default: + n.logger.Error("error establishing connection from pool", "err", err) + } return } defer func() { diff --git a/internal/notifier/notifier_test.go b/internal/notifier/notifier_test.go index 565c430c..ddd1dfc0 100644 --- a/internal/notifier/notifier_test.go +++ b/internal/notifier/notifier_test.go @@ -2,7 +2,6 @@ package notifier import ( "context" - "log/slog" "testing" "time" @@ -11,7 +10,6 @@ import ( "github.com/riverqueue/river/internal/componentstatus" "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/riverinternaltest" - "github.com/riverqueue/river/internal/util/slogutil" ) func expectReceiveStatus(t *testing.T, statusCh <-chan componentstatus.Status, expected componentstatus.Status) { @@ -36,7 +34,7 @@ func TestNotifierReceivesNotification(t *testing.T) { statusUpdateCh <- status } - notifier := New(riverinternaltest.BaseServiceArchetype(t), db.Config().ConnConfig, statusUpdate, slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})) + notifier := New(riverinternaltest.BaseServiceArchetype(t), db.Config().ConnConfig, statusUpdate, riverinternaltest.Logger(t)) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() diff --git a/producer_test.go b/producer_test.go index d0b30ae6..34c7e612 100644 --- a/producer_test.go +++ b/producer_test.go @@ -73,7 +73,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { })) ignoreNotifierStatusUpdates := func(componentstatus.Status) {} - notifier := notifier.New(archetype, dbPool.Config().ConnConfig, ignoreNotifierStatusUpdates, slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})) + notifier := notifier.New(archetype, dbPool.Config().ConnConfig, ignoreNotifierStatusUpdates, riverinternaltest.Logger(t)) config := &producerConfig{ ErrorHandler: newTestErrorHandler(), From da689e8aa7f09a5d2d4ee2fba4f69c3ef05b00e5 Mon Sep 17 00:00:00 2001 From: Martin Englund Date: Wed, 3 Jan 2024 13:05:26 -0800 Subject: [PATCH 4/4] use riverinternaltest.Logger(t) --- producer_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/producer_test.go b/producer_test.go index 34c7e612..6473706c 100644 --- a/producer_test.go +++ b/producer_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "log/slog" "slices" "sync" "testing" @@ -20,7 +19,6 @@ import ( "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" - "github.com/riverqueue/river/internal/util/slogutil" ) func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { @@ -174,7 +172,7 @@ func Test_Producer_Run(t *testing.T) { workers := NewWorkers() - notifier := notifier.New(archetype, dbPool.Config().ConnConfig, func(componentstatus.Status) {}, slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})) + notifier := notifier.New(archetype, dbPool.Config().ConnConfig, func(componentstatus.Status) {}, riverinternaltest.Logger(t)) config := &producerConfig{ ErrorHandler: newTestErrorHandler(),