From af8ea59e91b7a3514bb3baa4225d5c2628b5ea3b Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 25 Apr 2025 22:46:38 -0700 Subject: [PATCH] Consolidate `pollForSettingChanges` with other producer subroutines I made some changes to the producer in #848 to fix an intermittent test introduced by queue pause/unpause by moving a goroutine into the producer's main start goroutine and making sure to wait on it finishing before leaving the producer. I didn't notice at the time that there's already a place where some other producer subroutines are started, and we can clean things up somewhat by moving `pollForSettingChanges` down to where the other subroutines are booted and consolidating the wait group. --- producer.go | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/producer.go b/producer.go index 2ead83d7..5ffafa69 100644 --- a/producer.go +++ b/producer.go @@ -370,18 +370,6 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { p.Logger.DebugContext(fetchCtx, p.Name+": Run loop stopped", slog.String("queue", p.config.Queue), slog.Uint64("num_completed_jobs", p.numJobsRan.Load())) }() - var wg sync.WaitGroup - defer wg.Wait() - if p.config.Notifier == nil { - p.Logger.DebugContext(fetchCtx, p.Name+": No notifier configured; starting in poll mode", "client_id", p.config.ClientID) - - wg.Add(1) - go func() { - defer wg.Done() - p.pollForSettingChanges(fetchCtx, initiallyPaused, initialMetadata) - }() - } - if insertSub != nil { defer insertSub.Unlisten(fetchCtx) } @@ -390,12 +378,19 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { defer controlSub.Unlisten(fetchCtx) } - var subroutineWg sync.WaitGroup - subroutineWg.Add(3) + var subroutineWG sync.WaitGroup + subroutineWG.Add(3) subroutineCtx, cancelSubroutines := context.WithCancelCause(context.WithoutCancel(fetchCtx)) - go p.heartbeatLogLoop(subroutineCtx, &subroutineWg) - go p.reportQueueStatusLoop(subroutineCtx, &subroutineWg) - go p.reportProducerStatusLoop(subroutineCtx, &subroutineWg) + go p.heartbeatLogLoop(subroutineCtx, &subroutineWG) + go p.reportQueueStatusLoop(subroutineCtx, &subroutineWG) + go p.reportProducerStatusLoop(subroutineCtx, &subroutineWG) + + if p.config.Notifier == nil { + p.Logger.DebugContext(subroutineCtx, p.Name+": No notifier configured; starting in poll mode", "client_id", p.config.ClientID) + + subroutineWG.Add(1) + go p.pollForSettingChanges(subroutineCtx, &subroutineWG, initiallyPaused, initialMetadata) + } p.fetchAndRunLoop(fetchCtx, workCtx, fetchLimiter) p.Logger.Debug(p.Name+": Entering shutdown loop", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load())) @@ -403,7 +398,7 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { p.Logger.Debug(p.Name+": Shutdown loop exited, awaiting subroutines", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load())) cancelSubroutines(errors.New("producer stopped")) - subroutineWg.Wait() + subroutineWG.Wait() p.Logger.Debug(p.Name+": Shutdown subroutines completed, finalizing", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load())) p.finalizeShutdown(context.WithoutCancel(fetchCtx)) @@ -790,7 +785,9 @@ func (p *producer) handleWorkerDone(job *rivertype.JobRow) { p.jobResultCh <- job } -func (p *producer) pollForSettingChanges(ctx context.Context, lastPaused bool, lastMetadata []byte) { +func (p *producer) pollForSettingChanges(ctx context.Context, wg *sync.WaitGroup, lastPaused bool, lastMetadata []byte) { + defer wg.Done() + ticker := time.NewTicker(p.config.QueuePollInterval) for { select {