diff --git a/CHANGELOG.md b/CHANGELOG.md index a8978d81..4649c82b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a memory leak caused by allocating a new random source on every job execution. Thank you @shawnstephens for reporting ❤️ [PR #240](https://github.com/riverqueue/river/pull/240). - Fix a problem where `JobListParams.Queues()` didn't filter correctly based on its arguments. [PR #212](https://github.com/riverqueue/river/pull/212). - Fix a problem in `DebouncedChan` where it would fire on its "out" channel too often when it was being signaled continuousy on its "in" channel. This would have caused work to be fetched more often than intended in busy systems. [PR #222](https://github.com/riverqueue/river/pull/222). diff --git a/client.go b/client.go index 395b4d06..1f88ba16 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,7 @@ import ( "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" + "github.com/riverqueue/river/internal/util/randutil" "github.com/riverqueue/river/internal/util/sliceutil" "github.com/riverqueue/river/internal/util/valutil" "github.com/riverqueue/river/internal/workunit" @@ -433,6 +434,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client archetype := &baseservice.Archetype{ DisableSleep: config.disableSleep, Logger: config.Logger, + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), TimeNowUTC: func() time.Time { return time.Now().UTC() }, } diff --git a/internal/baseservice/base_service.go b/internal/baseservice/base_service.go index d41fb7a3..c0052362 100644 --- a/internal/baseservice/base_service.go +++ b/internal/baseservice/base_service.go @@ -30,6 +30,15 @@ type Archetype struct { // Logger is a structured logger. Logger *slog.Logger + // Rand is a random source safe for concurrent access and seeded with a + // cryptographically random seed to ensure good distribution between nodes + // and services. The random source itself is _not_ cryptographically secure, + // and therefore should not be used anywhere security-related. This is a + // deliberate choice because Go's non-crypto rand source is about twenty + // times faster, and so far none of our uses of random require cryptographic + // secure randomness. + Rand *rand.Rand + // TimeNowUTC returns the current time as UTC. Normally it's implemented as // a call to `time.Now().UTC()`, but may be overridden in tests for time // injection. Services should try to use this function instead of the @@ -116,7 +125,7 @@ func Init[TService withBaseService](archetype *Archetype, service TService) TSer baseService.DisableSleep = archetype.DisableSleep baseService.Logger = archetype.Logger baseService.Name = reflect.TypeOf(service).Elem().Name() - baseService.Rand = randutil.NewCryptoSeededConcurrentSafeRand() + baseService.Rand = archetype.Rand baseService.TimeNowUTC = archetype.TimeNowUTC return service diff --git a/internal/baseservice/base_service_test.go b/internal/baseservice/base_service_test.go index 71cd6f61..80145c89 100644 --- a/internal/baseservice/base_service_test.go +++ b/internal/baseservice/base_service_test.go @@ -8,6 +8,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/internal/util/randutil" ) func TestArchetype_WithSleepDisabled(t *testing.T) { @@ -69,6 +71,7 @@ func archetype() *Archetype { return &Archetype{ DisableSleep: true, Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)), + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), TimeNowUTC: func() time.Time { return time.Now().UTC() }, } } diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index 5b6a155c..7b8e0040 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -24,6 +24,7 @@ import ( "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest/slogtest" //nolint:depguard "github.com/riverqueue/river/internal/testdb" + "github.com/riverqueue/river/internal/util/randutil" "github.com/riverqueue/river/internal/util/valutil" ) @@ -57,6 +58,7 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype { return &baseservice.Archetype{ Logger: Logger(tb), + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), TimeNowUTC: func() time.Time { return time.Now().UTC() }, } } diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 6d3a8ae6..10128130 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -19,6 +19,7 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/util/maputil" + "github.com/riverqueue/river/internal/util/randutil" "github.com/riverqueue/river/internal/util/sliceutil" "github.com/riverqueue/river/riverdriver" ) @@ -94,6 +95,7 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx] archetype := &baseservice.Archetype{ Logger: logger, + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), TimeNowUTC: func() time.Time { return time.Now().UTC() }, }