-
Notifications
You must be signed in to change notification settings - Fork 138
Description
We are trialing River in production. We have a consumer process that consumes a message from Kafka and inserts a River job. We've been running with 1 River worker process (MaxWorkers = 4) that runs separately from the consumer process that has an insert-only client. I've observed memory increasing through relative low volume periods, and it increases quickly during high volume of 30+ messages per second. The dip near the end of the chart is a restart. You can see memory increasing during a relatively low volume period during the overnight hours.
I'll paste some of the code below:
Insert-only Client creation:
riverClient, err := river.NewClient(riverpgxv5.New(ds.PgxPool()), &river.Config{})
if err != nil {
log.Errorf("could not initialize new River client: %s", err.Error())
panic(err)
}River job insertion
_, err := riverClient.Insert(ctx, ShipmentMessage{JSONString: string(message.Value)}, nil)
if err != nil {
log.Errorf("failed to insert job: %s", err.Error())
} else {
metrics.Count("shipment_message_worker.message_inserted", 1, []string{}, 1)
}The worker (this process is where the memory may be leaking)
func (ShipmentMessage) Kind() string { return "shipment_message" }
func (rw *ShipmentMessageWorker) Work(ctx context.Context, job *river.Job[ShipmentMessage]) error {
metrics.Count("shipment_message_worker.message_processed", 1, []string{}, 1)
return nil
}Worker initialization:
func InitRiverWorkerAndClient(ctx context.Context, ds *postgres.Datastore, cfg Config) {
// add river worker
riverWorkers := river.NewWorkers()
if err := river.AddWorkerSafely(riverWorkers, &ShipmentMessageWorker{
Logger: log.New(),
}); err != nil {
log.Errorf("could not register new River worker: %s", err.Error())
panic(err)
}
log.Infof("new River worker registered successfully")
// create new river client
client, err := river.NewClient(riverpgxv5.New(ds.PgxPool()), &river.Config{
// cfg.MaxRiverWorkers is 4
Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: cfg.MaxRiverWorkers}},
Workers: riverWorkers,
})
if err != nil {
log.Errorf("could not initialize new River client: %s", err.Error())
panic(err)
}
// start the client
if err := client.Start(ctx); err != nil {
log.Errorf("failed to start River client: %s", err.Error())
panic(err)
}
// ensures the worker doesn't exit until it's terminated by a signal
<-ctx.Done()
// Note we use context cancelation to stop the worker on shutdown.
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels

