Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `otelriver` option `MiddlewareConfig.DurationUnit`. Can be used to configure duration metrics to be emitted in milliseconds instead of the default seconds. [PR #XXX](https://github.com/riverqueue/rivercontrib/pull/XXX).

## [0.1.0] - 2025-03-16

### Added

Initial release. Mainly brings in the `otelriver` package for use of River with OpenTelemetry and DataDog. [PR #1](https://github.com/riverqueue/rivercontrib/pull/1).
- Initial release. Mainly brings in the `otelriver` package for use of River with OpenTelemetry and DataDog. [PR #1](https://github.com/riverqueue/rivercontrib/pull/1).
74 changes: 50 additions & 24 deletions otelriver/middleware.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package otelriver

import (
"cmp"
"context"
"time"

Expand All @@ -24,6 +25,12 @@ const (

// MiddlewareConfig is configuration for River's OpenTelemetry middleware.
type MiddlewareConfig struct {
// DurationUnit selects the unit in which duration metrics like
// `river.work_duration` are emitted.
//
// Must be one of "ms" (milliseconds) or "s" (seconds). Defaults to seconds.
DurationUnit string

// MeterProvider is a MeterProvider to base metrics on. May be left as nil
// to use the default global provider.
MeterProvider metric.MeterProvider
Expand All @@ -38,6 +45,7 @@ type MiddlewareConfig struct {
type Middleware struct {
river.MiddlewareDefaults

config *MiddlewareConfig
meter metric.Meter
metrics middlewareMetrics
tracer trace.Tracer
Expand All @@ -58,34 +66,41 @@ type middlewareMetrics struct {
//
// config may be nil.
func NewMiddleware(config *MiddlewareConfig) *Middleware {
var (
meterProvider = otel.GetMeterProvider()
tracerProvider = otel.GetTracerProvider()
)
if config != nil {
if config.MeterProvider != nil {
meterProvider = config.MeterProvider
}
if config.TracerProvider != nil {
tracerProvider = config.TracerProvider
}
if config == nil {
config = &MiddlewareConfig{}
}

durationUnit := cmp.Or(config.DurationUnit, "s")
if durationUnit != "ms" && durationUnit != "s" {
panic("duration unit must be one of ms or s")
}

meterProvider := otel.GetMeterProvider()
if config.MeterProvider != nil {
meterProvider = config.MeterProvider
}

tracerProvider := otel.GetTracerProvider()
if config.TracerProvider != nil {
tracerProvider = config.TracerProvider
}

meter := meterProvider.Meter(name)

return &Middleware{
meter: meter,
config: config,
meter: meter,
metrics: middlewareMetrics{
// See unit guidelines:
//
// https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units
insertCount: mustInt64Counter(meter, prefix+"insert_count", metric.WithDescription("Number of jobs inserted"), metric.WithUnit("{job}")),
insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")),
insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit("s")),
insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit("s")),
insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)),
insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)),
workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")),
workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit("s")),
workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit("s")),
workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)),
workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)),
},
tracer: tracerProvider.Tracer(name),
}
Expand All @@ -107,17 +122,17 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
panicked = true // set to false if program leaves normally
)
defer func() {
durationSeconds := time.Since(begin).Seconds()
duration := m.durationInPreferredUnit(time.Since(begin))

setAttributeAndSpanStatus(attrs, statusIndex, span, panicked, err)

// This allocates a new slice, so make sure to do it as few times as possible.
measurementOpt := metric.WithAttributes(attrs...)

m.metrics.insertCount.Add(ctx, int64(len(manyParams)))
m.metrics.insertManyCount.Add(ctx, 1)
m.metrics.insertManyDuration.Record(ctx, durationSeconds, measurementOpt)
m.metrics.insertManyDurationHistogram.Record(ctx, durationSeconds, measurementOpt)
m.metrics.insertCount.Add(ctx, int64(len(manyParams)), measurementOpt)
m.metrics.insertManyCount.Add(ctx, 1, measurementOpt)
m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt)
m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt)
}()

insertRes, err = doInner(ctx)
Expand All @@ -143,23 +158,34 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
panicked = true // set to false if program leaves normally
)
defer func() {
durationSeconds := time.Since(begin).Seconds()
duration := m.durationInPreferredUnit(time.Since(begin))

setAttributeAndSpanStatus(attrs, statusIndex, span, panicked, err)

// This allocates a new slice, so make sure to do it as few times as possible.
measurementOpt := metric.WithAttributes(attrs...)

m.metrics.workCount.Add(ctx, 1, measurementOpt)
m.metrics.workDuration.Record(ctx, durationSeconds, measurementOpt)
m.metrics.workDurationHistogram.Record(ctx, durationSeconds, measurementOpt)
m.metrics.workDuration.Record(ctx, duration, measurementOpt)
m.metrics.workDurationHistogram.Record(ctx, duration, measurementOpt)
}()

err = doInner(ctx)
panicked = false
return err
}

func (m *Middleware) durationInPreferredUnit(duration time.Duration) float64 {
switch m.config.DurationUnit {
case "ms":
return float64(duration.Milliseconds())
case "s":
fallthrough
default:
return duration.Seconds()
}
}

func mustFloat64Gauge(meter metric.Meter, name string, options ...metric.Float64GaugeOption) metric.Float64Gauge {
metric, err := meter.Float64Gauge(name, options...)
if err != nil {
Expand Down
Loading
Loading