Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `otelriver` option `MiddlewareConfig.DurationUnit`. Can be used to configure duration metrics to be emitted in milliseconds instead of the default seconds. [PR #XXX](https://github.com/riverqueue/rivercontrib/pull/XXX).
- Added `otelriver` option `MiddlewareConfig.DurationUnit`. Can be used to configure duration metrics to be emitted in milliseconds instead of the default seconds. [PR #10](https://github.com/riverqueue/rivercontrib/pull/10).
- More attributes like job ID and timestamps on OpenTelemetry spans. [PR #11](https://github.com/riverqueue/rivercontrib/pull/11).
- Added `otelriver` option `EnableSemanticMetrics` which will cause the middleware to emit metrics compliant with OpenTelemetry [semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/). [PR #12](https://github.com/riverqueue/rivercontrib/pull/12).

## [0.1.0] - 2025-03-16

Expand Down
8 changes: 5 additions & 3 deletions otelriver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ The middleware supports these options:

``` go
middleware := otelriver.NewMiddleware(&MiddlewareConfig{
DurationUnit: "ms",
MeterProvider: meterProvider,
TracerProvider: tracerProvider,
DurationUnit: "ms",
EnableSemanticMetrics: true,
MeterProvider: meterProvider,
TracerProvider: tracerProvider,
})
```

* `DurationUnit`: The unit which durations are emitted as, either "ms" (milliseconds) or "s" (seconds). Defaults to seconds.
* `EnableSemanticMetrics`: Causes the middleware to emit metrics compliant with OpenTelemetry's ["semantic conventions"](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/) for message clients. This has the effect of having all messaging systems share the same common metric names, with attributes differentiating them.
* `MeterProvider`: Injected OpenTelemetry meter provider. The global meter provider is used by default.
* `TracerProvider`: Injected OpenTelemetry tracer provider. The global tracer provider is used by default.

Expand Down
90 changes: 68 additions & 22 deletions otelriver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,20 @@ type MiddlewareConfig struct {
// `river.work_duration` are emitted.
//
// Must be one of "ms" (milliseconds) or "s" (seconds). Defaults to seconds.
//
// Does not modify metrics emitted by EnableSemanticMetrics because those
// are constrained to seconds by specification.
DurationUnit string

// EnableSemanticMetrics emits metrics compliant with OpenTelemetry's
// "semantic conventions" for messaging clients:
//
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/
//
// This has the effect of having all messaging systems share the same common
// metric names, with attributes differentiating them.
EnableSemanticMetrics bool

// MeterProvider is a MeterProvider to base metrics on. May be left as nil
// to use the default global provider.
MeterProvider metric.MeterProvider
Expand All @@ -54,13 +66,17 @@ type Middleware struct {

// Bundle of metrics associated with a middleware.
type middlewareMetrics struct {
insertCount metric.Int64Counter
insertManyCount metric.Int64Counter
insertManyDuration metric.Float64Gauge
insertManyDurationHistogram metric.Float64Histogram
workCount metric.Int64Counter
workDuration metric.Float64Gauge
workDurationHistogram metric.Float64Histogram
insertCount metric.Int64Counter
insertManyCount metric.Int64Counter
insertManyDuration metric.Float64Gauge
insertManyDurationHistogram metric.Float64Histogram
messagingClientConsumedMessages metric.Int64Counter
messagingClientOperationDuration metric.Float64Histogram
messagingClientSentMessages metric.Int64Counter
messagingProcessDuration metric.Float64Histogram
workCount metric.Int64Counter
workDuration metric.Float64Gauge
workDurationHistogram metric.Float64Histogram
}

// NewMiddleware initializes a new River OpenTelemetry middleware.
Expand Down Expand Up @@ -88,22 +104,31 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware {

meter := meterProvider.Meter(name)

metrics := middlewareMetrics{
// See unit guidelines:
//
// https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units
insertCount: mustInt64Counter(meter, prefix+"insert_count", metric.WithDescription("Number of jobs inserted"), metric.WithUnit("{job}")),
insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")),
insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)),
insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)),
workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")),
workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)),
workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)),
}

if config.EnableSemanticMetrics {
metrics.messagingClientConsumedMessages = mustInt64Counter(meter, "messaging.client.consumed.messages", metric.WithDescription("Number of messages that were delivered to the application."))
metrics.messagingClientOperationDuration = mustFloat64Histogram(meter, "messaging.client.operation.duration", metric.WithDescription("Duration of messaging operation initiated by a producer or consumer client."), metric.WithUnit(durationUnit))
metrics.messagingClientSentMessages = mustInt64Counter(meter, "messaging.client.sent.messages", metric.WithDescription("Number of messages producer attempted to send to the broker."))
metrics.messagingProcessDuration = mustFloat64Histogram(meter, "messaging.process.duration", metric.WithDescription("Duration of processing operation."), metric.WithUnit(durationUnit))
}

return &Middleware{
config: config,
meter: meter,
metrics: middlewareMetrics{
// See unit guidelines:
//
// https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units
insertCount: mustInt64Counter(meter, prefix+"insert_count", metric.WithDescription("Number of jobs inserted"), metric.WithUnit("{job}")),
insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")),
insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)),
insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)),
workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")),
workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)),
workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)),
},
tracer: tracerProvider.Tracer(name),
config: config,
meter: meter,
metrics: metrics,
tracer: tracerProvider.Tracer(name),
}
}

Expand Down Expand Up @@ -135,6 +160,16 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
m.metrics.insertManyCount.Add(ctx, 1, measurementOpt)
m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt)
m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt)

if m.config.EnableSemanticMetrics {
measurementOpt := metric.WithAttributes(
attribute.String("messaging.operation.name", "insert_many"),
attribute.String("messaging.system", "river"),
)

m.metrics.messagingClientOperationDuration.Record(ctx, duration, measurementOpt)
m.metrics.messagingClientSentMessages.Add(ctx, int64(len(manyParams)), measurementOpt)
}
}()

insertRes, err = doInner(ctx)
Expand Down Expand Up @@ -189,6 +224,17 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
m.metrics.workCount.Add(ctx, 1, measurementOpt)
m.metrics.workDuration.Record(ctx, duration, measurementOpt)
m.metrics.workDurationHistogram.Record(ctx, duration, measurementOpt)

if m.config.EnableSemanticMetrics {
measurementOpt := metric.WithAttributes(
attribute.String("messaging.operation.name", "work"),
attribute.String("messaging.system", "river"),
)

m.metrics.messagingClientConsumedMessages.Add(ctx, 1, measurementOpt)
m.metrics.messagingClientOperationDuration.Record(ctx, duration, measurementOpt)
m.metrics.messagingProcessDuration.Record(ctx, duration, measurementOpt)
}
}()

err = doInner(ctx)
Expand Down
106 changes: 99 additions & 7 deletions otelriver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func TestMiddleware(t *testing.T) {
requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...)
requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...)
requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...)

// Requires EnableSemanticMetrics.
requireNoMetric(t, metrics, "messaging.client.sent.messages")
requireNoMetric(t, metrics, "messaging.client.operation.duration")
})

// Make sure the middleware can fall back to a global provider.
Expand Down Expand Up @@ -220,6 +224,40 @@ func TestMiddleware(t *testing.T) {
}
})

t.Run("InsertManyEnableSemanticMetrics", func(t *testing.T) {
t.Parallel()

middleware, bundle := setupConfig(t, &MiddlewareConfig{
EnableSemanticMetrics: true,
})

doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
return []*rivertype.JobInsertResult{
{Job: &rivertype.JobRow{ID: 123}},
}, nil
}

insertRes, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{Kind: "no_op"}}, doInner)
require.NoError(t, err)
require.Equal(t, []*rivertype.JobInsertResult{
{Job: &rivertype.JobRow{ID: 123}},
}, insertRes)

var (
expectedAttrs = []attribute.KeyValue{
attribute.String("messaging.operation.name", "insert_many"),
attribute.String("messaging.system", "river"),
}
metrics metricdata.ResourceMetrics
)
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
requireSum(t, metrics, "messaging.client.sent.messages", 1, expectedAttrs...)
{
metric, _ := requireHistogramCount(t, metrics, "messaging.client.operation.duration", 1, expectedAttrs...)
require.Equal(t, "s", metric.Unit)
}
})

t.Run("WorkSuccess", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -276,6 +314,11 @@ func TestMiddleware(t *testing.T) {
metric, _ := requireHistogramCount(t, metrics, "river.work_duration_histogram", 1, expectedAttrs...)
require.Equal(t, "s", metric.Unit)
}

// Requires EnableSemanticMetrics.
requireNoMetric(t, metrics, "messaging.client.consumed.messages")
requireNoMetric(t, metrics, "messaging.client.operation.duration")
requireNoMetric(t, metrics, "messaging.process.duration")
})

t.Run("WorkError", func(t *testing.T) {
Expand Down Expand Up @@ -439,6 +482,41 @@ func TestMiddleware(t *testing.T) {
require.Equal(t, "ms", metric.Unit)
}
})

t.Run("WorkEnableSemanticMetrics ", func(t *testing.T) {
t.Parallel()

middleware, bundle := setupConfig(t, &MiddlewareConfig{
EnableSemanticMetrics: true,
})

doInner := func(ctx context.Context) error {
return nil
}

err := middleware.Work(ctx, &rivertype.JobRow{
ID: 123,
}, doInner)
require.NoError(t, err)

var (
expectedAttrs = []attribute.KeyValue{
attribute.String("messaging.operation.name", "work"),
attribute.String("messaging.system", "river"),
}
metrics metricdata.ResourceMetrics
)
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
requireSum(t, metrics, "messaging.client.consumed.messages", 1, expectedAttrs...)
{
metric, _ := requireHistogramCount(t, metrics, "messaging.client.operation.duration", 1, expectedAttrs...)
require.Equal(t, "s", metric.Unit)
}
{
metric, _ := requireHistogramCount(t, metrics, "messaging.process.duration", 1, expectedAttrs...)
require.Equal(t, "s", metric.Unit)
}
})
}

func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribute.Value {
Expand All @@ -453,25 +531,24 @@ func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribut
return attribute.Value{}
}

func getMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T) {
func getMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T, bool) {
t.Helper()

for _, scopeMetrics := range metrics.ScopeMetrics {
for _, metric := range scopeMetrics.Metrics {
if metric.Name == name {
return metric, metric.Data.(T) //nolint:forcetypeassert
return metric, metric.Data.(T), true //nolint:forcetypeassert
}
}
}
t.Fatalf("Metrics not found: %s", name)
var defaultVal T
return metricdata.Metrics{}, defaultVal
return metricdata.Metrics{}, defaultVal, false
}

func requireGaugeNotEmpty(t *testing.T, metrics metricdata.ResourceMetrics, name string, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Gauge[float64]) { //nolint:unparam
t.Helper()

metric, metricData := getMetric[metricdata.Gauge[float64]](t, metrics, name)
metric, metricData := requireMetric[metricdata.Gauge[float64]](t, metrics, name)
require.NotEmpty(t, metricData.DataPoints)
metricdatatest.AssertHasAttributes(t, metric, attrs...)
return metric, metricData
Expand All @@ -480,16 +557,31 @@ func requireGaugeNotEmpty(t *testing.T, metrics metricdata.ResourceMetrics, name
func requireHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string, count uint64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Histogram[float64]) { //nolint:unparam
t.Helper()

metric, metricData := getMetric[metricdata.Histogram[float64]](t, metrics, name)
metric, metricData := requireMetric[metricdata.Histogram[float64]](t, metrics, name)
require.Equal(t, count, metricData.DataPoints[0].Count)
metricdatatest.AssertHasAttributes(t, metric, attrs...)
return metric, metricData
}

func requireNoMetric(t *testing.T, metrics metricdata.ResourceMetrics, name string) {
t.Helper()

_, _, ok := getMetric[metricdata.ResourceMetrics](t, metrics, name)
require.False(t, ok, "Metric should not have been emitted, but was found: "+name)
}

func requireMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T) {
t.Helper()

metric, metricData, ok := getMetric[T](t, metrics, name)
require.True(t, ok, "Metric not found: "+name)
return metric, metricData
}

func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, val int64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Sum[int64]) { //nolint:unparam
t.Helper()

metric, metricData := getMetric[metricdata.Sum[int64]](t, metrics, name)
metric, metricData := requireMetric[metricdata.Sum[int64]](t, metrics, name)
require.Equal(t, val, metricData.DataPoints[0].Value)
metricdatatest.AssertHasAttributes(t, metric, attrs...)
return metric, metricData
Expand Down
Loading