From 0b68fccf0b5985e12f5f889e3f47222707ba665a Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 25 Apr 2025 21:54:45 -0700 Subject: [PATCH] Attempt fetch immediately when a queue is unpaused Here, try to speed up the test suite by triggering a job fetch in the producer when receiving a notice that a queue's been unpaused. Currently, the "queue paused" example is by far the slowest test in the entire project, clocking in at an impressive 1+ second: $ go test . -count 1 -run Example_queuePause -test.v === RUN Example_queuePause Initial: 42ns After start: 102.926042ms Before pause: 109.6845ms After pause: 109.879458ms After insert: 112.385292ms After reliable receive: 208.227042ms After unreliable receive: 1.106444792s After stop: 1.111110333s --- PASS: Example_queuePause (1.11s) PASS ok github.com/riverqueue/river 1.318s (Extra timing information's been added to the test case above, but notice how waiting for the receive from the unreliable queue after unpausing it takes 800 ms+.) The test's written in a way that it's hard to make it really fast, but this change does succeed in more than doubling its speed: $ go test . -count 1 -run Example_queuePause -test.v === RUN Example_queuePause Initial: 41ns After start: 74.874791ms Before pause: 77.679708ms After pause: 83.262416ms After insert: 88.872ms After reliable receive: 177.465125ms After unreliable receive: 276.994541ms After stop: 284.25225ms --- PASS: Example_queuePause (0.28s) PASS ok github.com/riverqueue/river 0.516s Because example tests have to run sequentially, that means we knock about 800 ms off every run of the top level package's tests. --- CHANGELOG.md | 4 ++++ producer.go | 1 + 2 files changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2478a398..0e1e15aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Client no longer returns an error if stopped before startup could complete (previously, it returned the unexported `ErrShutdown`). [PR #841](https://github.com/riverqueue/river/pull/841). +### Fixed + +- A queue unpausing triggers an immediate fetch so that available jobs in the paused queue may be started faster than before. [PR #854](https://github.com/riverqueue/river/pull/854). + ## [0.20.2] - 2025-04-08 ### Added diff --git a/producer.go b/producer.go index 2ead83d7..7cc79c76 100644 --- a/producer.go +++ b/producer.go @@ -535,6 +535,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit } p.paused = false p.Logger.DebugContext(workCtx, p.Name+": Resumed", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue)) + fetchLimiter.Call() // try another fetch because more jobs may be available to run which were gated behind the paused queue p.testSignals.Resumed.Signal(struct{}{}) if p.config.QueueEventCallback != nil { p.config.QueueEventCallback(&Event{Kind: EventKindQueueResumed, Queue: &rivertype.Queue{Name: p.config.Queue}})