diff --git a/client.go b/client.go index 169eb6c3..13d70866 100644 --- a/client.go +++ b/client.go @@ -490,22 +490,20 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client for queue, queueConfig := range config.Queues { client.producersByQueueName[queue] = newProducer(archetype, driver.GetExecutor(), &producerConfig{ - ClientID: config.ID, - Completer: client.completer, - ErrorHandler: config.ErrorHandler, - FetchCooldown: config.FetchCooldown, - FetchPollInterval: config.FetchPollInterval, - JobTimeout: config.JobTimeout, - MaxWorkers: queueConfig.MaxWorkers, - Notifier: client.notifier, - Queue: queue, - QueueEventCallback: func(event *Event) { - client.distributeQueueEvent(event) - }, - RetryPolicy: config.RetryPolicy, - SchedulerInterval: config.schedulerInterval, - StatusFunc: client.monitor.SetProducerStatus, - Workers: config.Workers, + ClientID: config.ID, + Completer: client.completer, + ErrorHandler: config.ErrorHandler, + FetchCooldown: config.FetchCooldown, + FetchPollInterval: config.FetchPollInterval, + JobTimeout: config.JobTimeout, + MaxWorkers: queueConfig.MaxWorkers, + Notifier: client.notifier, + Queue: queue, + QueueEventCallback: client.distributeQueueEvent, + RetryPolicy: config.RetryPolicy, + SchedulerInterval: config.schedulerInterval, + StatusFunc: client.monitor.SetProducerStatus, + Workers: config.Workers, }) client.monitor.InitializeProducerStatus(queue) } @@ -668,23 +666,21 @@ func (c *Client[TTx]) Start(ctx context.Context) error { return err } - if c.completer != nil { - // The completer is part of the services list below, but although it can - // stop gracefully along with all the other services, it needs to be - // started with a context that's _not_ fetchCtx. This ensures that even - // when fetch is cancelled on shutdown, the completer is still given a - // separate opportunity to start stopping only after the producers have - // finished up and returned. - if err := c.completer.Start(ctx); err != nil { - stopServicesOnError() - return err - } - - // Receives job complete notifications from the completer and - // distributes them to any subscriptions. - c.completer.Subscribe(c.distributeJobCompleterCallback) + // The completer is part of the services list below, but although it can + // stop gracefully along with all the other services, it needs to be + // started with a context that's _not_ fetchCtx. This ensures that even + // when fetch is cancelled on shutdown, the completer is still given a + // separate opportunity to start stopping only after the producers have + // finished up and returned. + if err := c.completer.Start(ctx); err != nil { + stopServicesOnError() + return err } + // Receives job complete notifications from the completer and + // distributes them to any subscriptions. + c.completer.Subscribe(c.distributeJobCompleterCallback) + // We use separate contexts for fetching and working to allow for a graceful // stop. Both inherit from the provided context, so if it's cancelled, a // more aggressive stop will be initiated.