Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 27 additions & 31 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,22 +490,20 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

for queue, queueConfig := range config.Queues {
client.producersByQueueName[queue] = newProducer(archetype, driver.GetExecutor(), &producerConfig{
ClientID: config.ID,
Completer: client.completer,
ErrorHandler: config.ErrorHandler,
FetchCooldown: config.FetchCooldown,
FetchPollInterval: config.FetchPollInterval,
JobTimeout: config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: client.notifier,
Queue: queue,
QueueEventCallback: func(event *Event) {
client.distributeQueueEvent(event)
},
RetryPolicy: config.RetryPolicy,
SchedulerInterval: config.schedulerInterval,
StatusFunc: client.monitor.SetProducerStatus,
Workers: config.Workers,
ClientID: config.ID,
Completer: client.completer,
ErrorHandler: config.ErrorHandler,
FetchCooldown: config.FetchCooldown,
FetchPollInterval: config.FetchPollInterval,
JobTimeout: config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: client.notifier,
Queue: queue,
QueueEventCallback: client.distributeQueueEvent,
RetryPolicy: config.RetryPolicy,
SchedulerInterval: config.schedulerInterval,
StatusFunc: client.monitor.SetProducerStatus,
Workers: config.Workers,
})
client.monitor.InitializeProducerStatus(queue)
}
Expand Down Expand Up @@ -668,23 +666,21 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

if c.completer != nil {
// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
stopServicesOnError()
return err
}

// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
stopServicesOnError()
return err
}

// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)

// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
Expand Down