diff --git a/CHANGELOG.md b/CHANGELOG.md index d33b482..6cb0496 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added `otelriver` option `MiddlewareConfig.DurationUnit`. Can be used to configure duration metrics to be emitted in milliseconds instead of the default seconds. [PR #XXX](https://github.com/riverqueue/rivercontrib/pull/XXX). +- Added `otelriver` option `MiddlewareConfig.DurationUnit`. Can be used to configure duration metrics to be emitted in milliseconds instead of the default seconds. [PR #10](https://github.com/riverqueue/rivercontrib/pull/10). +- More attributes like job ID and timestamps on OpenTelemetry spans. [PR #11](https://github.com/riverqueue/rivercontrib/pull/11). +- Added `otelriver` option `EnableSemanticMetrics` which will cause the middleware to emit metrics compliant with OpenTelemetry [semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/). [PR #12](https://github.com/riverqueue/rivercontrib/pull/12). ## [0.1.0] - 2025-03-16 diff --git a/otelriver/README.md b/otelriver/README.md index fccd933..74595a8 100644 --- a/otelriver/README.md +++ b/otelriver/README.md @@ -10,13 +10,15 @@ The middleware supports these options: ``` go middleware := otelriver.NewMiddleware(&MiddlewareConfig{ - DurationUnit: "ms", - MeterProvider: meterProvider, - TracerProvider: tracerProvider, + DurationUnit: "ms", + EnableSemanticMetrics: true, + MeterProvider: meterProvider, + TracerProvider: tracerProvider, }) ``` * `DurationUnit`: The unit which durations are emitted as, either "ms" (milliseconds) or "s" (seconds). Defaults to seconds. +* `EnableSemanticMetrics`: Causes the middleware to emit metrics compliant with OpenTelemetry's ["semantic conventions"](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/) for message clients. This has the effect of having all messaging systems share the same common metric names, with attributes differentiating them. * `MeterProvider`: Injected OpenTelemetry meter provider. The global meter provider is used by default. * `TracerProvider`: Injected OpenTelemetry tracer provider. The global tracer provider is used by default. diff --git a/otelriver/middleware.go b/otelriver/middleware.go index bc5c3d3..7a565c4 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -30,8 +30,20 @@ type MiddlewareConfig struct { // `river.work_duration` are emitted. // // Must be one of "ms" (milliseconds) or "s" (seconds). Defaults to seconds. + // + // Does not modify metrics emitted by EnableSemanticMetrics because those + // are constrained to seconds by specification. DurationUnit string + // EnableSemanticMetrics emits metrics compliant with OpenTelemetry's + // "semantic conventions" for messaging clients: + // + // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/ + // + // This has the effect of having all messaging systems share the same common + // metric names, with attributes differentiating them. + EnableSemanticMetrics bool + // MeterProvider is a MeterProvider to base metrics on. May be left as nil // to use the default global provider. MeterProvider metric.MeterProvider @@ -54,13 +66,17 @@ type Middleware struct { // Bundle of metrics associated with a middleware. type middlewareMetrics struct { - insertCount metric.Int64Counter - insertManyCount metric.Int64Counter - insertManyDuration metric.Float64Gauge - insertManyDurationHistogram metric.Float64Histogram - workCount metric.Int64Counter - workDuration metric.Float64Gauge - workDurationHistogram metric.Float64Histogram + insertCount metric.Int64Counter + insertManyCount metric.Int64Counter + insertManyDuration metric.Float64Gauge + insertManyDurationHistogram metric.Float64Histogram + messagingClientConsumedMessages metric.Int64Counter + messagingClientOperationDuration metric.Float64Histogram + messagingClientSentMessages metric.Int64Counter + messagingProcessDuration metric.Float64Histogram + workCount metric.Int64Counter + workDuration metric.Float64Gauge + workDurationHistogram metric.Float64Histogram } // NewMiddleware initializes a new River OpenTelemetry middleware. @@ -88,22 +104,31 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware { meter := meterProvider.Meter(name) + metrics := middlewareMetrics{ + // See unit guidelines: + // + // https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units + insertCount: mustInt64Counter(meter, prefix+"insert_count", metric.WithDescription("Number of jobs inserted"), metric.WithUnit("{job}")), + insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")), + insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)), + insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)), + workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")), + workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)), + workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)), + } + + if config.EnableSemanticMetrics { + metrics.messagingClientConsumedMessages = mustInt64Counter(meter, "messaging.client.consumed.messages", metric.WithDescription("Number of messages that were delivered to the application.")) + metrics.messagingClientOperationDuration = mustFloat64Histogram(meter, "messaging.client.operation.duration", metric.WithDescription("Duration of messaging operation initiated by a producer or consumer client."), metric.WithUnit(durationUnit)) + metrics.messagingClientSentMessages = mustInt64Counter(meter, "messaging.client.sent.messages", metric.WithDescription("Number of messages producer attempted to send to the broker.")) + metrics.messagingProcessDuration = mustFloat64Histogram(meter, "messaging.process.duration", metric.WithDescription("Duration of processing operation."), metric.WithUnit(durationUnit)) + } + return &Middleware{ - config: config, - meter: meter, - metrics: middlewareMetrics{ - // See unit guidelines: - // - // https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units - insertCount: mustInt64Counter(meter, prefix+"insert_count", metric.WithDescription("Number of jobs inserted"), metric.WithUnit("{job}")), - insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")), - insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)), - insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)), - workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")), - workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)), - workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)), - }, - tracer: tracerProvider.Tracer(name), + config: config, + meter: meter, + metrics: metrics, + tracer: tracerProvider.Tracer(name), } } @@ -135,6 +160,16 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job m.metrics.insertManyCount.Add(ctx, 1, measurementOpt) m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt) m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt) + + if m.config.EnableSemanticMetrics { + measurementOpt := metric.WithAttributes( + attribute.String("messaging.operation.name", "insert_many"), + attribute.String("messaging.system", "river"), + ) + + m.metrics.messagingClientOperationDuration.Record(ctx, duration, measurementOpt) + m.metrics.messagingClientSentMessages.Add(ctx, int64(len(manyParams)), measurementOpt) + } }() insertRes, err = doInner(ctx) @@ -189,6 +224,17 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu m.metrics.workCount.Add(ctx, 1, measurementOpt) m.metrics.workDuration.Record(ctx, duration, measurementOpt) m.metrics.workDurationHistogram.Record(ctx, duration, measurementOpt) + + if m.config.EnableSemanticMetrics { + measurementOpt := metric.WithAttributes( + attribute.String("messaging.operation.name", "work"), + attribute.String("messaging.system", "river"), + ) + + m.metrics.messagingClientConsumedMessages.Add(ctx, 1, measurementOpt) + m.metrics.messagingClientOperationDuration.Record(ctx, duration, measurementOpt) + m.metrics.messagingProcessDuration.Record(ctx, duration, measurementOpt) + } }() err = doInner(ctx) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index fc12f38..4fbfcdf 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -169,6 +169,10 @@ func TestMiddleware(t *testing.T) { requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...) + + // Requires EnableSemanticMetrics. + requireNoMetric(t, metrics, "messaging.client.sent.messages") + requireNoMetric(t, metrics, "messaging.client.operation.duration") }) // Make sure the middleware can fall back to a global provider. @@ -220,6 +224,40 @@ func TestMiddleware(t *testing.T) { } }) + t.Run("InsertManyEnableSemanticMetrics", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setupConfig(t, &MiddlewareConfig{ + EnableSemanticMetrics: true, + }) + + doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { + return []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 123}}, + }, nil + } + + insertRes, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{Kind: "no_op"}}, doInner) + require.NoError(t, err) + require.Equal(t, []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 123}}, + }, insertRes) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("messaging.operation.name", "insert_many"), + attribute.String("messaging.system", "river"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "messaging.client.sent.messages", 1, expectedAttrs...) + { + metric, _ := requireHistogramCount(t, metrics, "messaging.client.operation.duration", 1, expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } + }) + t.Run("WorkSuccess", func(t *testing.T) { t.Parallel() @@ -276,6 +314,11 @@ func TestMiddleware(t *testing.T) { metric, _ := requireHistogramCount(t, metrics, "river.work_duration_histogram", 1, expectedAttrs...) require.Equal(t, "s", metric.Unit) } + + // Requires EnableSemanticMetrics. + requireNoMetric(t, metrics, "messaging.client.consumed.messages") + requireNoMetric(t, metrics, "messaging.client.operation.duration") + requireNoMetric(t, metrics, "messaging.process.duration") }) t.Run("WorkError", func(t *testing.T) { @@ -439,6 +482,41 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "ms", metric.Unit) } }) + + t.Run("WorkEnableSemanticMetrics ", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setupConfig(t, &MiddlewareConfig{ + EnableSemanticMetrics: true, + }) + + doInner := func(ctx context.Context) error { + return nil + } + + err := middleware.Work(ctx, &rivertype.JobRow{ + ID: 123, + }, doInner) + require.NoError(t, err) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("messaging.operation.name", "work"), + attribute.String("messaging.system", "river"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "messaging.client.consumed.messages", 1, expectedAttrs...) + { + metric, _ := requireHistogramCount(t, metrics, "messaging.client.operation.duration", 1, expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } + { + metric, _ := requireHistogramCount(t, metrics, "messaging.process.duration", 1, expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } + }) } func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribute.Value { @@ -453,25 +531,24 @@ func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribut return attribute.Value{} } -func getMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T) { +func getMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T, bool) { t.Helper() for _, scopeMetrics := range metrics.ScopeMetrics { for _, metric := range scopeMetrics.Metrics { if metric.Name == name { - return metric, metric.Data.(T) //nolint:forcetypeassert + return metric, metric.Data.(T), true //nolint:forcetypeassert } } } - t.Fatalf("Metrics not found: %s", name) var defaultVal T - return metricdata.Metrics{}, defaultVal + return metricdata.Metrics{}, defaultVal, false } func requireGaugeNotEmpty(t *testing.T, metrics metricdata.ResourceMetrics, name string, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Gauge[float64]) { //nolint:unparam t.Helper() - metric, metricData := getMetric[metricdata.Gauge[float64]](t, metrics, name) + metric, metricData := requireMetric[metricdata.Gauge[float64]](t, metrics, name) require.NotEmpty(t, metricData.DataPoints) metricdatatest.AssertHasAttributes(t, metric, attrs...) return metric, metricData @@ -480,16 +557,31 @@ func requireGaugeNotEmpty(t *testing.T, metrics metricdata.ResourceMetrics, name func requireHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string, count uint64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Histogram[float64]) { //nolint:unparam t.Helper() - metric, metricData := getMetric[metricdata.Histogram[float64]](t, metrics, name) + metric, metricData := requireMetric[metricdata.Histogram[float64]](t, metrics, name) require.Equal(t, count, metricData.DataPoints[0].Count) metricdatatest.AssertHasAttributes(t, metric, attrs...) return metric, metricData } +func requireNoMetric(t *testing.T, metrics metricdata.ResourceMetrics, name string) { + t.Helper() + + _, _, ok := getMetric[metricdata.ResourceMetrics](t, metrics, name) + require.False(t, ok, "Metric should not have been emitted, but was found: "+name) +} + +func requireMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T) { + t.Helper() + + metric, metricData, ok := getMetric[T](t, metrics, name) + require.True(t, ok, "Metric not found: "+name) + return metric, metricData +} + func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, val int64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Sum[int64]) { //nolint:unparam t.Helper() - metric, metricData := getMetric[metricdata.Sum[int64]](t, metrics, name) + metric, metricData := requireMetric[metricdata.Sum[int64]](t, metrics, name) require.Equal(t, val, metricData.DataPoints[0].Value) metricdatatest.AssertHasAttributes(t, metric, attrs...) return metric, metricData