diff --git a/CHANGELOG.md b/CHANGELOG.md index c65e008..d33b482 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `otelriver` option `MiddlewareConfig.DurationUnit`. Can be used to configure duration metrics to be emitted in milliseconds instead of the default seconds. [PR #XXX](https://github.com/riverqueue/rivercontrib/pull/XXX). + ## [0.1.0] - 2025-03-16 ### Added -Initial release. Mainly brings in the `otelriver` package for use of River with OpenTelemetry and DataDog. [PR #1](https://github.com/riverqueue/rivercontrib/pull/1). +- Initial release. Mainly brings in the `otelriver` package for use of River with OpenTelemetry and DataDog. [PR #1](https://github.com/riverqueue/rivercontrib/pull/1). diff --git a/otelriver/middleware.go b/otelriver/middleware.go index d847ac2..caadd1f 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -1,6 +1,7 @@ package otelriver import ( + "cmp" "context" "time" @@ -24,6 +25,12 @@ const ( // MiddlewareConfig is configuration for River's OpenTelemetry middleware. type MiddlewareConfig struct { + // DurationUnit selects the unit in which duration metrics like + // `river.work_duration` are emitted. + // + // Must be one of "ms" (milliseconds) or "s" (seconds). Defaults to seconds. + DurationUnit string + // MeterProvider is a MeterProvider to base metrics on. May be left as nil // to use the default global provider. MeterProvider metric.MeterProvider @@ -38,6 +45,7 @@ type MiddlewareConfig struct { type Middleware struct { river.MiddlewareDefaults + config *MiddlewareConfig meter metric.Meter metrics middlewareMetrics tracer trace.Tracer @@ -58,34 +66,41 @@ type middlewareMetrics struct { // // config may be nil. func NewMiddleware(config *MiddlewareConfig) *Middleware { - var ( - meterProvider = otel.GetMeterProvider() - tracerProvider = otel.GetTracerProvider() - ) - if config != nil { - if config.MeterProvider != nil { - meterProvider = config.MeterProvider - } - if config.TracerProvider != nil { - tracerProvider = config.TracerProvider - } + if config == nil { + config = &MiddlewareConfig{} + } + + durationUnit := cmp.Or(config.DurationUnit, "s") + if durationUnit != "ms" && durationUnit != "s" { + panic("duration unit must be one of ms or s") + } + + meterProvider := otel.GetMeterProvider() + if config.MeterProvider != nil { + meterProvider = config.MeterProvider + } + + tracerProvider := otel.GetTracerProvider() + if config.TracerProvider != nil { + tracerProvider = config.TracerProvider } meter := meterProvider.Meter(name) return &Middleware{ - meter: meter, + config: config, + meter: meter, metrics: middlewareMetrics{ // See unit guidelines: // // https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units insertCount: mustInt64Counter(meter, prefix+"insert_count", metric.WithDescription("Number of jobs inserted"), metric.WithUnit("{job}")), insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")), - insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit("s")), - insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit("s")), + insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)), + insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)), workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")), - workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit("s")), - workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit("s")), + workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)), + workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)), }, tracer: tracerProvider.Tracer(name), } @@ -107,17 +122,17 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job panicked = true // set to false if program leaves normally ) defer func() { - durationSeconds := time.Since(begin).Seconds() + duration := m.durationInPreferredUnit(time.Since(begin)) setAttributeAndSpanStatus(attrs, statusIndex, span, panicked, err) // This allocates a new slice, so make sure to do it as few times as possible. measurementOpt := metric.WithAttributes(attrs...) - m.metrics.insertCount.Add(ctx, int64(len(manyParams))) - m.metrics.insertManyCount.Add(ctx, 1) - m.metrics.insertManyDuration.Record(ctx, durationSeconds, measurementOpt) - m.metrics.insertManyDurationHistogram.Record(ctx, durationSeconds, measurementOpt) + m.metrics.insertCount.Add(ctx, int64(len(manyParams)), measurementOpt) + m.metrics.insertManyCount.Add(ctx, 1, measurementOpt) + m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt) + m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt) }() insertRes, err = doInner(ctx) @@ -143,7 +158,7 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu panicked = true // set to false if program leaves normally ) defer func() { - durationSeconds := time.Since(begin).Seconds() + duration := m.durationInPreferredUnit(time.Since(begin)) setAttributeAndSpanStatus(attrs, statusIndex, span, panicked, err) @@ -151,8 +166,8 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu measurementOpt := metric.WithAttributes(attrs...) m.metrics.workCount.Add(ctx, 1, measurementOpt) - m.metrics.workDuration.Record(ctx, durationSeconds, measurementOpt) - m.metrics.workDurationHistogram.Record(ctx, durationSeconds, measurementOpt) + m.metrics.workDuration.Record(ctx, duration, measurementOpt) + m.metrics.workDurationHistogram.Record(ctx, duration, measurementOpt) }() err = doInner(ctx) @@ -160,6 +175,17 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu return err } +func (m *Middleware) durationInPreferredUnit(duration time.Duration) float64 { + switch m.config.DurationUnit { + case "ms": + return float64(duration.Milliseconds()) + case "s": + fallthrough + default: + return duration.Seconds() + } +} + func mustFloat64Gauge(meter metric.Meter, name string, options ...metric.Float64GaugeOption) metric.Float64Gauge { metric, err := meter.Float64Gauge(name, options...) if err != nil { diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index be4aefb..ead2959 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -8,6 +8,9 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -26,19 +29,31 @@ func TestMiddleware(t *testing.T) { ctx := context.Background() type testBundle struct { - exporter *tracetest.InMemoryExporter + metricReader *metric.ManualReader + traceExporter *tracetest.InMemoryExporter } - setup := func(t *testing.T) (*Middleware, *testBundle) { + setupConfig := func(t *testing.T, config *MiddlewareConfig) (*Middleware, *testBundle) { t.Helper() - exporter := tracetest.NewInMemoryExporter() + var ( + metricReader = metric.NewManualReader() + traceExporter = tracetest.NewInMemoryExporter() + ) - return NewMiddleware(&MiddlewareConfig{ - TracerProvider: sdktrace.NewTracerProvider(sdktrace.WithSyncer(exporter)), - }), &testBundle{ - exporter: exporter, - } + config.MeterProvider = metric.NewMeterProvider(metric.WithReader(metricReader)) + config.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSyncer(traceExporter)) + + return NewMiddleware(config), &testBundle{ + metricReader: metricReader, + traceExporter: traceExporter, + } + } + + setup := func(t *testing.T) (*Middleware, *testBundle) { + t.Helper() + + return setupConfig(t, &MiddlewareConfig{}) } t.Run("InsertManySuccess", func(t *testing.T) { @@ -58,13 +73,31 @@ func TestMiddleware(t *testing.T) { {Job: &rivertype.JobRow{ID: 123}}, }, insertRes) - spans := bundle.exporter.GetSpans() + spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) span := spans[0] require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Ok, span.Status.Code) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("status", "ok"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...) + requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) + { + metric, _ := requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } + { + metric, _ := requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } }) t.Run("InsertManyError", func(t *testing.T) { @@ -79,7 +112,7 @@ func TestMiddleware(t *testing.T) { _, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{Kind: "no_op"}}, doInner) require.EqualError(t, err, "error from doInner") - spans := bundle.exporter.GetSpans() + spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) span := spans[0] @@ -87,6 +120,18 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "error from doInner", span.Status.Description) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("status", "error"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...) + requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) + requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) + requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...) }) t.Run("InsertManyPanic", func(t *testing.T) { @@ -102,7 +147,7 @@ func TestMiddleware(t *testing.T) { _, _ = middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{Kind: "no_op"}}, doInner) }) - spans := bundle.exporter.GetSpans() + spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) span := spans[0] @@ -110,6 +155,18 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "panic", span.Status.Description) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("status", "panic"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...) + requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...) + requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...) + requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...) }) // Make sure the middleware can fall back to a global provider. @@ -128,6 +185,39 @@ func TestMiddleware(t *testing.T) { require.NoError(t, err) }) + t.Run("InsertManyDurationUnitMS", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setupConfig(t, &MiddlewareConfig{ + DurationUnit: "ms", + }) + + doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { + return []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 123}}, + }, nil + } + + insertRes, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{Kind: "no_op"}}, doInner) + require.NoError(t, err) + require.Equal(t, []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 123}}, + }, insertRes) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.insert_count", 1) + requireSum(t, metrics, "river.insert_many_count", 1) + { + metric, _ := requireGaugeNotEmpty(t, metrics, "river.insert_many_duration") + require.Equal(t, "ms", metric.Unit) + } + { + metric, _ := requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1) + require.Equal(t, "ms", metric.Unit) + } + }) + t.Run("WorkSuccess", func(t *testing.T) { t.Parallel() @@ -144,7 +234,7 @@ func TestMiddleware(t *testing.T) { }, doInner) require.NoError(t, err) - spans := bundle.exporter.GetSpans() + spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) span := spans[0] @@ -153,6 +243,23 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) require.Equal(t, "river.work", span.Name) require.Equal(t, codes.Ok, span.Status.Code) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("status", "ok"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.work_count", 1, expectedAttrs...) + { + metric, _ := requireGaugeNotEmpty(t, metrics, "river.work_duration", expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } + { + metric, _ := requireHistogramCount(t, metrics, "river.work_duration_histogram", 1, expectedAttrs...) + require.Equal(t, "s", metric.Unit) + } }) t.Run("WorkError", func(t *testing.T) { @@ -171,7 +278,7 @@ func TestMiddleware(t *testing.T) { }, doInner) require.EqualError(t, err, "error from doInner") - spans := bundle.exporter.GetSpans() + spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) span := spans[0] @@ -181,6 +288,17 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.work", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "error from doInner", span.Status.Description) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("status", "error"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.work_count", 1, expectedAttrs...) + requireGaugeNotEmpty(t, metrics, "river.work_duration", expectedAttrs...) + requireHistogramCount(t, metrics, "river.work_duration_histogram", 1, expectedAttrs...) }) t.Run("WorkPanic", func(t *testing.T) { @@ -200,7 +318,7 @@ func TestMiddleware(t *testing.T) { }, doInner) }) - spans := bundle.exporter.GetSpans() + spans := bundle.traceExporter.GetSpans() require.Len(t, spans, 1) span := spans[0] @@ -210,6 +328,17 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.work", span.Name) require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "panic", span.Status.Description) + + var ( + expectedAttrs = []attribute.KeyValue{ + attribute.String("status", "panic"), + } + metrics metricdata.ResourceMetrics + ) + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.work_count", 1, expectedAttrs...) + requireGaugeNotEmpty(t, metrics, "river.work_duration", expectedAttrs...) + requireHistogramCount(t, metrics, "river.work_duration_histogram", 1, expectedAttrs...) }) // Make sure the middleware can fall back to a global provider. @@ -225,6 +354,33 @@ func TestMiddleware(t *testing.T) { err := middleware.Work(ctx, &rivertype.JobRow{Kind: "no_op"}, doInner) require.NoError(t, err) }) + + t.Run("WorkDurationUnitMS", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setupConfig(t, &MiddlewareConfig{ + DurationUnit: "ms", + }) + + doInner := func(ctx context.Context) error { + return nil + } + + err := middleware.Work(ctx, &rivertype.JobRow{Kind: "no_op"}, doInner) + require.NoError(t, err) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + requireSum(t, metrics, "river.work_count", 1) + { + metric, _ := requireGaugeNotEmpty(t, metrics, "river.work_duration") + require.Equal(t, "ms", metric.Unit) + } + { + metric, _ := requireHistogramCount(t, metrics, "river.work_duration_histogram", 1) + require.Equal(t, "ms", metric.Unit) + } + }) } func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribute.Value { @@ -238,3 +394,45 @@ func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribut require.FailNow(t, "key not found in attributes: "+key) return attribute.Value{} } + +func getMetric[T metricdatatest.Datatypes](t *testing.T, metrics metricdata.ResourceMetrics, name string) (metricdata.Metrics, T) { + t.Helper() + + for _, scopeMetrics := range metrics.ScopeMetrics { + for _, metric := range scopeMetrics.Metrics { + if metric.Name == name { + return metric, metric.Data.(T) //nolint:forcetypeassert + } + } + } + t.Fatalf("Metrics not found: %s", name) + var defaultVal T + return metricdata.Metrics{}, defaultVal +} + +func requireGaugeNotEmpty(t *testing.T, metrics metricdata.ResourceMetrics, name string, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Gauge[float64]) { //nolint:unparam + t.Helper() + + metric, metricData := getMetric[metricdata.Gauge[float64]](t, metrics, name) + require.NotEmpty(t, metricData.DataPoints) + metricdatatest.AssertHasAttributes(t, metric, attrs...) + return metric, metricData +} + +func requireHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string, count uint64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Histogram[float64]) { //nolint:unparam + t.Helper() + + metric, metricData := getMetric[metricdata.Histogram[float64]](t, metrics, name) + require.Equal(t, count, metricData.DataPoints[0].Count) + metricdatatest.AssertHasAttributes(t, metric, attrs...) + return metric, metricData +} + +func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, val int64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Sum[int64]) { //nolint:unparam + t.Helper() + + metric, metricData := getMetric[metricdata.Sum[int64]](t, metrics, name) + require.Equal(t, val, metricData.DataPoints[0].Value) + metricdatatest.AssertHasAttributes(t, metric, attrs...) + return metric, metricData +}