From eb25b1b725b09d1382b2adf2c5adc2ebbe7ecaf8 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 5 Apr 2025 23:05:28 -0700 Subject: [PATCH] Add `EnableSemanticMetrics` setting to emit "semantic" metrics Here, address #7 to bring in a new option `EnableSemanticMetrics` that causes the middleware to emit metrics compliant with OpenTelemetry's "semantic conventions" [1]. These are a way to have all pub/sub systems emit metrics with the same name and differentiated by attribute. I've put this behind a configuration option because I'm not super convinced that this is going to be an important feature for most people, and under the assumption that metrics are cheap but not free, it probably makes sense to emit only as many as necessary. Fixes #7. [1] https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/ --- CHANGELOG.md | 4 +- otelriver/README.md | 8 ++- otelriver/middleware.go | 90 +++++++++++++++++++++-------- otelriver/middleware_test.go | 106 ++++++++++++++++++++++++++++++++--- 4 files changed, 175 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d33b482..6cb0496 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added `otelriver` option `MiddlewareConfig.DurationUnit`. Can be used to configure duration metrics to be emitted in milliseconds instead of the default seconds. [PR #XXX](https://github.com/riverqueue/rivercontrib/pull/XXX). +- Added `otelriver` option `MiddlewareConfig.DurationUnit`. Can be used to configure duration metrics to be emitted in milliseconds instead of the default seconds. [PR #10](https://github.com/riverqueue/rivercontrib/pull/10). +- More attributes like job ID and timestamps on OpenTelemetry spans. [PR #11](https://github.com/riverqueue/rivercontrib/pull/11). +- Added `otelriver` option `EnableSemanticMetrics` which will cause the middleware to emit metrics compliant with OpenTelemetry [semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/). [PR #12](https://github.com/riverqueue/rivercontrib/pull/12). ## [0.1.0] - 2025-03-16 diff --git a/otelriver/README.md b/otelriver/README.md index fccd933..74595a8 100644 --- a/otelriver/README.md +++ b/otelriver/README.md @@ -10,13 +10,15 @@ The middleware supports these options: ``` go middleware := otelriver.NewMiddleware(&MiddlewareConfig{ - DurationUnit: "ms", - MeterProvider: meterProvider, - TracerProvider: tracerProvider, + DurationUnit: "ms", + EnableSemanticMetrics: true, + MeterProvider: meterProvider, + TracerProvider: tracerProvider, }) ``` * `DurationUnit`: The unit which durations are emitted as, either "ms" (milliseconds) or "s" (seconds). Defaults to seconds. +* `EnableSemanticMetrics`: Causes the middleware to emit metrics compliant with OpenTelemetry's ["semantic conventions"](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/) for message clients. This has the effect of having all messaging systems share the same common metric names, with attributes differentiating them. * `MeterProvider`: Injected OpenTelemetry meter provider. The global meter provider is used by default. * `TracerProvider`: Injected OpenTelemetry tracer provider. The global tracer provider is used by default. diff --git a/otelriver/middleware.go b/otelriver/middleware.go index bc5c3d3..7a565c4 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -30,8 +30,20 @@ type MiddlewareConfig struct { // `river.work_duration` are emitted. // // Must be one of "ms" (milliseconds) or "s" (seconds). Defaults to seconds. + // + // Does not modify metrics emitted by EnableSemanticMetrics because those + // are constrained to seconds by specification. DurationUnit string + // EnableSemanticMetrics emits metrics compliant with OpenTelemetry's + // "semantic conventions" for messaging clients: + // + // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/ + // + // This has the effect of having all messaging systems share the same common + // metric names, with attributes differentiating them. + EnableSemanticMetrics bool + // MeterProvider is a MeterProvider to base metrics on. May be left as nil // to use the default global provider. MeterProvider metric.MeterProvider @@ -54,13 +66,17 @@ type Middleware struct { // Bundle of metrics associated with a middleware. type middlewareMetrics struct { - insertCount metric.Int64Counter - insertManyCount metric.Int64Counter - insertManyDuration metric.Float64Gauge - insertManyDurationHistogram metric.Float64Histogram - workCount metric.Int64Counter - workDuration metric.Float64Gauge - workDurationHistogram metric.Float64Histogram + insertCount metric.Int64Counter + insertManyCount metric.Int64Counter + insertManyDuration metric.Float64Gauge + insertManyDurationHistogram metric.Float64Histogram + messagingClientConsumedMessages metric.Int64Counter + messagingClientOperationDuration metric.Float64Histogram + messagingClientSentMessages metric.Int64Counter + messagingProcessDuration metric.Float64Histogram + workCount metric.Int64Counter + workDuration metric.Float64Gauge + workDurationHistogram metric.Float64Histogram } // NewMiddleware initializes a new River OpenTelemetry middleware. @@ -88,22 +104,31 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware { meter := meterProvider.Meter(name) + metrics := middlewareMetrics{ + // See unit guidelines: + // + // https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units + insertCount: mustInt64Counter(meter, prefix+"insert_count", metric.WithDescription("Number of jobs inserted"), metric.WithUnit("{job}")), + insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")), + insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)), + insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)), + workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")), + workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)), + workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)), + } + + if config.EnableSemanticMetrics { + metrics.messagingClientConsumedMessages = mustInt64Counter(meter, "messaging.client.consumed.messages", metric.WithDescription("Number of messages that were delivered to the application.")) + metrics.messagingClientOperationDuration = mustFloat64Histogram(meter, "messaging.client.operation.duration", metric.WithDescription("Duration of messaging operation initiated by a producer or consumer client."), metric.WithUnit(durationUnit)) + metrics.messagingClientSentMessages = mustInt64Counter(meter, "messaging.client.sent.messages", metric.WithDescription("Number of messages producer attempted to send to the broker.")) + metrics.messagingProcessDuration = mustFloat64Histogram(meter, "messaging.process.duration", metric.WithDescription("Duration of processing operation."), metric.WithUnit(durationUnit)) + } + return &Middleware{ - config: config, - meter: meter, - metrics: middlewareMetrics{ - // See unit guidelines: - // - // https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units - insertCount: mustInt64Counter(meter, prefix+"insert_count", metric.WithDescription("Number of jobs inserted"), metric.WithUnit("{job}")), - insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")), - insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)), - insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)), - workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")), - workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)), - workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)), - }, - tracer: tracerProvider.Tracer(name), + config: config, + meter: meter, + metrics: metrics, + tracer: tracerProvider.Tracer(name), } } @@ -135,6 +160,16 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job m.metrics.insertManyCount.Add(ctx, 1, measurementOpt) m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt) m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt) + + if m.config.EnableSemanticMetrics { + measurementOpt := metric.WithAttributes( + attribute.String("messaging.operation.name", "insert_many"), + attribute.String("messaging.system", "river"), + ) + + m.metrics.messagingClientOperationDuration.Record(ctx, duration, measurementOpt) + m.metrics.messagingClientSentMessages.Add(ctx, int64(len(manyParams)), measurementOpt) + } }() insertRes, err = doInner(ctx) @@ -189,6 +224,17 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu m.metrics.workCount.Add(ctx, 1, measurementOpt) m.metrics.workDuration.Record(ctx, duration, measurementOpt) m.metrics.workDurationHistogram.Record(ctx, duration, measurementOpt) + + if m.config.EnableSemanticMetrics { + measurementOpt := metric.WithAttributes( + attribute.String("messaging.operation.name", "work"), + attribute.String("messaging.system", "river"), + ) + + m.metrics.messagingClientConsumedMessages.Add(ctx, 1, measurementOpt) + m.metrics.messagingClientOperationDuration.Record(ctx, duration, measurementOpt) + m.metrics.messagingProcessDuration.Record(ctx, duration, measurementOpt) + } }() err = doInner(ctx) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index fc12f38..4fbfcdf 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -169,6 +169,10 @@ func TestMiddleware(t *testing.T) { requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...) + + // Requires EnableSemanticMetrics. + requireNoMetric(t, metrics, "messaging.client.sent.messages") + requireNoMetric(t, metrics, "messaging.client.operation.duration") }) // Make sure the middleware can fall back to a global provider. @@ -220,6 +224,40 @@ func TestMiddleware(t *testing.T) { } }) + t.Run("InsertManyEnableSemanticMetrics", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setupConfig(t, &MiddlewareConfig{ + EnableSemanticMetrics: true, + }) + + doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { + return []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 123}}, + }, nil + } + + insertRes, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{Kind: "no_op"}}, doInner) + require.NoError(t, err) + require.Equal(t, []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 123}}, + }, insertRes) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("messaging.operation.name", "insert_many"), + attribute.String("messaging.system", "river"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "messaging.client.sent.messages", 1, expectedAttrs...) + { + metric, _ := requireHistogramCount(t, metrics, "messaging.client.operation.duration", 1, expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } + }) + t.Run("WorkSuccess", func(t *testing.T) { t.Parallel() @@ -276,6 +314,11 @@ func TestMiddleware(t *testing.T) { metric, _ := requireHistogramCount(t, metrics, "river.work_duration_histogram", 1, expectedAttrs...) require.Equal(t, "s", metric.Unit) } + + // Requires EnableSemanticMetrics. + requireNoMetric(t, metrics, "messaging.client.consumed.messages") + requireNoMetric(t, metrics, "messaging.client.operation.duration") + requireNoMetric(t, metrics, "messaging.process.duration") }) t.Run("WorkError", func(t *testing.T) { @@ -439,6 +482,41 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "ms", metric.Unit) } }) + + t.Run("WorkEnableSemanticMetrics ", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setupConfig(t, &MiddlewareConfig{ + EnableSemanticMetrics: true, + }) + + doInner := func(ctx context.Context) error { + return nil + } + + err := middleware.Work(ctx, &rivertype.JobRow{ + ID: 123, + }, doInner) + require.NoError(t, err) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("messaging.operation.name", "work"), + attribute.String("messaging.system", "river"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "messaging.client.consumed.messages", 1, expectedAttrs...) + { + metric, _ := requireHistogramCount(t, metrics, "messaging.client.operation.duration", 1, expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } + { + metric, _ := requireHistogramCount(t, metrics, "messaging.process.duration", 1, expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } + }) } func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribute.Value { @@ -453,25 +531,24 @@ func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribut return attribute.Value{} } -func getMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T) { +func getMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T, bool) { t.Helper() for _, scopeMetrics := range metrics.ScopeMetrics { for _, metric := range scopeMetrics.Metrics { if metric.Name == name { - return metric, metric.Data.(T) //nolint:forcetypeassert + return metric, metric.Data.(T), true //nolint:forcetypeassert } } } - t.Fatalf("Metrics not found: %s", name) var defaultVal T - return metricdata.Metrics{}, defaultVal + return metricdata.Metrics{}, defaultVal, false } func requireGaugeNotEmpty(t *testing.T, metrics metricdata.ResourceMetrics, name string, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Gauge[float64]) { //nolint:unparam t.Helper() - metric, metricData := getMetric[metricdata.Gauge[float64]](t, metrics, name) + metric, metricData := requireMetric[metricdata.Gauge[float64]](t, metrics, name) require.NotEmpty(t, metricData.DataPoints) metricdatatest.AssertHasAttributes(t, metric, attrs...) return metric, metricData @@ -480,16 +557,31 @@ func requireGaugeNotEmpty(t *testing.T, metrics metricdata.ResourceMetrics, name func requireHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string, count uint64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Histogram[float64]) { //nolint:unparam t.Helper() - metric, metricData := getMetric[metricdata.Histogram[float64]](t, metrics, name) + metric, metricData := requireMetric[metricdata.Histogram[float64]](t, metrics, name) require.Equal(t, count, metricData.DataPoints[0].Count) metricdatatest.AssertHasAttributes(t, metric, attrs...) return metric, metricData } +func requireNoMetric(t *testing.T, metrics metricdata.ResourceMetrics, name string) { + t.Helper() + + _, _, ok := getMetric[metricdata.ResourceMetrics](t, metrics, name) + require.False(t, ok, "Metric should not have been emitted, but was found: "+name) +} + +func requireMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T) { + t.Helper() + + metric, metricData, ok := getMetric[T](t, metrics, name) + require.True(t, ok, "Metric not found: "+name) + return metric, metricData +} + func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, val int64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Sum[int64]) { //nolint:unparam t.Helper() - metric, metricData := getMetric[metricdata.Sum[int64]](t, metrics, name) + metric, metricData := requireMetric[metricdata.Sum[int64]](t, metrics, name) require.Equal(t, val, metricData.DataPoints[0].Value) metricdatatest.AssertHasAttributes(t, metric, attrs...) return metric, metricData