Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 15 additions & 169 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ import (
"os"
"regexp"
"strings"
"sync"
"time"

"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/componentstatus"
"github.com/riverqueue/river/internal/dblist"
"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/internal/leadership"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/maintenance/startstop"
Expand Down Expand Up @@ -304,6 +302,7 @@ type Client[TTx any] struct {
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
Expand All @@ -314,12 +313,7 @@ type Client[TTx any] struct {
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
services []startstop.Service
subscriptions map[int]*eventSubscription
subscriptionsMu sync.Mutex
subscriptionsSeq int // used for generating simple IDs
statsAggregate jobstats.JobStatistics
statsMu sync.Mutex
statsNumJobs int
subscriptionManager *subscriptionManager
stopped chan struct{}
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
Expand Down Expand Up @@ -471,7 +465,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
driver: driver,
monitor: newClientMonitor(),
producersByQueueName: make(map[string]*producer),
subscriptions: make(map[int]*eventSubscription),
testSignals: clientTestSignals{},
uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
Expand All @@ -490,8 +483,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return nil, errMissingDatabasePoolWithQueues
}

client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
client.services = append(client.services, client.completer)
client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), nil)
client.subscriptionManager = newSubscriptionManager(archetype, nil)
client.services = append(client.services, client.completer, client.subscriptionManager)

// In poll only mode, we don't try to initialize a notifier that uses
// listen/notify. Instead, each service polls for changes it's
Expand All @@ -517,7 +511,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
MaxWorkers: queueConfig.MaxWorkers,
Notifier: client.notifier,
Queue: queue,
QueueEventCallback: client.distributeQueueEvent,
QueueEventCallback: client.subscriptionManager.distributeQueueEvent,
RetryPolicy: config.RetryPolicy,
SchedulerInterval: config.schedulerInterval,
StatusFunc: client.monitor.SetProducerStatus,
Expand Down Expand Up @@ -666,6 +660,13 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return fmt.Errorf("error making initial connection to database: %w", err)
}

// Each time we start, we need a fresh completer subscribe channel to
// send job completion events on, because the completer will close it
// each time it shuts down.
c.completerSubscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(c.completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(c.completerSubscribeCh)

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
Expand Down Expand Up @@ -695,10 +696,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)

// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
Expand Down Expand Up @@ -758,17 +755,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
c.queueMaintainer,
))

// Remove all subscriptions and close corresponding channels.
func() {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

for subID, sub := range c.subscriptions {
close(sub.Chan)
delete(c.subscriptions, subID)
}
}()

// Shut down the monitor last so it can broadcast final status updates:
c.monitor.Stop()
}()
Expand Down Expand Up @@ -881,154 +867,14 @@ type SubscribeConfig struct {

// Special internal variant that lets us inject an overridden size.
func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if config.ChanSize < 0 {
panic("SubscribeConfig.ChanSize must be greater or equal to 1")
}
if config.ChanSize == 0 {
config.ChanSize = subscribeChanSizeDefault
}

for _, kind := range config.Kinds {
if _, ok := allKinds[kind]; !ok {
panic(fmt.Errorf("unknown event kind: %s", kind))
}
}

c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

subChan := make(chan *Event, config.ChanSize)

// Just gives us an easy way of removing the subscription again later.
subID := c.subscriptionsSeq
c.subscriptionsSeq++

c.subscriptions[subID] = &eventSubscription{
Chan: subChan,
Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
}

cancel := func() {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

// May no longer be present in case this was called after a stop.
sub, ok := c.subscriptions[subID]
if !ok {
return
}

close(sub.Chan)

delete(c.subscriptions, subID)
}

return subChan, cancel
}

// Distribute a single event into any listening subscriber channels.
//
// Job events should specify the job and stats, while queue events should only specify
// the queue.
func (c *Client[TTx]) distributeJobEvent(job *rivertype.JobRow, stats *JobStatistics) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

// Quick path so we don't need to allocate anything if no one is listening.
if len(c.subscriptions) < 1 {
return
}

var event *Event
switch job.State {
case rivertype.JobStateCancelled:
event = &Event{Kind: EventKindJobCancelled, Job: job, JobStats: stats}
case rivertype.JobStateCompleted:
event = &Event{Kind: EventKindJobCompleted, Job: job, JobStats: stats}
case rivertype.JobStateScheduled:
event = &Event{Kind: EventKindJobSnoozed, Job: job, JobStats: stats}
case rivertype.JobStateAvailable, rivertype.JobStateDiscarded, rivertype.JobStateRetryable, rivertype.JobStateRunning:
event = &Event{Kind: EventKindJobFailed, Job: job, JobStats: stats}
case rivertype.JobStatePending:
panic("completion subscriber unexpectedly received job in pending state, river bug")
default:
// linter exhaustive rule prevents this from being reached
panic("unreachable state to distribute, river bug")
}

// All subscription channels are non-blocking so this is always fast and
// there's no risk of falling behind what producers are sending.
for _, sub := range c.subscriptions {
if sub.ListensFor(event.Kind) {
select {
case sub.Chan <- event:
default:
}
}
}
}

func (c *Client[TTx]) distributeQueueEvent(event *Event) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

// All subscription channels are non-blocking so this is always fast and
// there's no risk of falling behind what producers are sending.
for _, sub := range c.subscriptions {
if sub.ListensFor(event.Kind) {
select {
case sub.Chan <- event:
default:
}
}
}
}

// Callback invoked by the completer and which prompts the client to update
// statistics and distribute jobs into any listening subscriber channels.
// (Subscriber channels are non-blocking so this should be quite fast.)
func (c *Client[TTx]) distributeJobCompleterCallback(update jobcompleter.CompleterJobUpdated) {
func() {
c.statsMu.Lock()
defer c.statsMu.Unlock()

stats := update.JobStats
c.statsAggregate.CompleteDuration += stats.CompleteDuration
c.statsAggregate.QueueWaitDuration += stats.QueueWaitDuration
c.statsAggregate.RunDuration += stats.RunDuration
c.statsNumJobs++
}()

c.distributeJobEvent(update.Job, jobStatisticsFromInternal(update.JobStats))
return c.subscriptionManager.SubscribeConfig(config)
}

// Dump aggregate stats from job completions to logs periodically. These
// numbers don't mean much in themselves, but can give a rough idea of the
// proportions of each compared to each other, and may help flag outlying values
// indicative of a problem.
func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, stopped chan struct{}) error {
// Handles a potential divide by zero.
safeDurationAverage := func(d time.Duration, n int) time.Duration {
if n == 0 {
return 0
}
return d / time.Duration(n)
}

logStats := func() {
c.statsMu.Lock()
defer c.statsMu.Unlock()

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Job stats (since last stats line)",
"num_jobs_run", c.statsNumJobs,
"average_complete_duration", safeDurationAverage(c.statsAggregate.CompleteDuration, c.statsNumJobs),
"average_queue_wait_duration", safeDurationAverage(c.statsAggregate.QueueWaitDuration, c.statsNumJobs),
"average_run_duration", safeDurationAverage(c.statsAggregate.RunDuration, c.statsNumJobs))

c.statsAggregate = jobstats.JobStatistics{}
c.statsNumJobs = 0
}

if !shouldStart {
return nil
}
Expand All @@ -1047,7 +893,7 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, stoppe
return

case <-ticker.C:
logStats()
c.subscriptionManager.logStats(ctx, c.baseService.Name)
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2957,7 +2957,7 @@ func Test_Client_Subscribe(t *testing.T) {
// Drops through immediately because the channel is closed.
riverinternaltest.WaitOrTimeout(t, subscribeChan)

require.Empty(t, client.subscriptions)
require.Empty(t, client.subscriptionManager.subscriptions)
})
}

Expand Down
Loading