Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Fixed a memory leak caused by allocating a new random source on every job execution. Thank you @shawnstephens for reporting ❤️ [PR #240](https://github.com/riverqueue/river/pull/240).
- Fix a problem where `JobListParams.Queues()` didn't filter correctly based on its arguments. [PR #212](https://github.com/riverqueue/river/pull/212).
- Fix a problem in `DebouncedChan` where it would fire on its "out" channel too often when it was being signaled continuousy on its "in" channel. This would have caused work to be fetched more often than intended in busy systems. [PR #222](https://github.com/riverqueue/river/pull/222).

Expand Down
2 changes: 2 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/internal/util/valutil"
"github.com/riverqueue/river/internal/workunit"
Expand Down Expand Up @@ -433,6 +434,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
archetype := &baseservice.Archetype{
DisableSleep: config.disableSleep,
Logger: config.Logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

Expand Down
11 changes: 10 additions & 1 deletion internal/baseservice/base_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ type Archetype struct {
// Logger is a structured logger.
Logger *slog.Logger

// Rand is a random source safe for concurrent access and seeded with a
// cryptographically random seed to ensure good distribution between nodes
// and services. The random source itself is _not_ cryptographically secure,
// and therefore should not be used anywhere security-related. This is a
// deliberate choice because Go's non-crypto rand source is about twenty
// times faster, and so far none of our uses of random require cryptographic
// secure randomness.
Rand *rand.Rand

// TimeNowUTC returns the current time as UTC. Normally it's implemented as
// a call to `time.Now().UTC()`, but may be overridden in tests for time
// injection. Services should try to use this function instead of the
Expand Down Expand Up @@ -116,7 +125,7 @@ func Init[TService withBaseService](archetype *Archetype, service TService) TSer
baseService.DisableSleep = archetype.DisableSleep
baseService.Logger = archetype.Logger
baseService.Name = reflect.TypeOf(service).Elem().Name()
baseService.Rand = randutil.NewCryptoSeededConcurrentSafeRand()
baseService.Rand = archetype.Rand
baseService.TimeNowUTC = archetype.TimeNowUTC

return service
Expand Down
3 changes: 3 additions & 0 deletions internal/baseservice/base_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/util/randutil"
)

func TestArchetype_WithSleepDisabled(t *testing.T) {
Expand Down Expand Up @@ -69,6 +71,7 @@ func archetype() *Archetype {
return &Archetype{
DisableSleep: true,
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}
}
2 changes: 2 additions & 0 deletions internal/riverinternaltest/riverinternaltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/riverinternaltest/slogtest" //nolint:depguard
"github.com/riverqueue/river/internal/testdb"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/valutil"
)

Expand Down Expand Up @@ -57,6 +58,7 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype {

return &baseservice.Archetype{
Logger: Logger(tb),
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}
}
Expand Down
2 changes: 2 additions & 0 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/util/dbutil"
"github.com/riverqueue/river/internal/util/maputil"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/riverdriver"
)
Expand Down Expand Up @@ -94,6 +95,7 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx]

archetype := &baseservice.Archetype{
Logger: logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

Expand Down