Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions otelriver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package otelriver
import (
"cmp"
"context"
"errors"
"time"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -107,7 +108,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware {
}

func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) {
ctx, span := m.tracer.Start(ctx, prefix+"insert_many")
ctx, span := m.tracer.Start(ctx, prefix+"insert_many",
trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()

attrs := []attribute.KeyValue{
Expand Down Expand Up @@ -141,16 +143,22 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
}

func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
ctx, span := m.tracer.Start(ctx, prefix+"work")
ctx, span := m.tracer.Start(ctx, prefix+"work",
trace.WithSpanKind(trace.SpanKindConsumer))
defer span.End()

attrs := []attribute.KeyValue{
attribute.Int64("id", job.ID),
attribute.Int("attempt", job.Attempt),
attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, aren't these attrs like id and timestamps going to have ~infinite cardinality? My understanding is that's a problem for large scale OTEL implementations. Maybe I'm not correctly understanding where these limits do/don't apply though.

Copy link
Contributor Author

@brandur brandur Apr 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't too sure on the cardinality thing either, but I tried to look into it, and I can't find any good sources that indicate that this kind of thing isn't recommended.

I found this one with respect to metrics (as opposed to spans), but it wouldn't really apply here because we do keep cardinality on metrics lower:

https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#cardinality-limits

I also found this issue on the OpenTelemetry repo, but it went unanswered:

open-telemetry/opentelemetry-specification#3554

I'm having trouble finding other citations warning against this though. Do you know of any?

I figure that if you're mapping spans to something like Sentry, it would be pretty useful to have the job ID in there (and I can say fairly confidently that cardinality on Sentry spans doesn't matter since we regularly store things like request ID there).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know, I'm really only finding warnings about cardinality in metrics and not traces. Seems fine to ship it for now and see if we hear back about issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K cool.

I think too if it becomes a problem later we could probably change this — removing an attribute wouldn't be anywhere near the same level of breaking change that an API change would be.

attribute.String("kind", job.Kind),
attribute.Int("priority", job.Priority),
attribute.String("queue", job.Queue),
attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)),
attribute.String("status", ""), // replaced below
attribute.StringSlice("tag", job.Tags),
}
const statusIndex = 3
const statusIndex = 7

var (
begin = time.Now()
Expand All @@ -160,6 +168,19 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
defer func() {
duration := m.durationInPreferredUnit(time.Since(begin))

if err != nil {
var (
cancelErr *river.JobCancelError
snoozeErr *river.JobSnoozeError
)
switch {
case errors.As(err, &cancelErr):
attrs = append(attrs, attribute.Bool("cancel", true))
case errors.As(err, &snoozeErr):
attrs = append(attrs, attribute.Bool("snooze", true))
}
}

setAttributeAndSpanStatus(attrs, statusIndex, span, panicked, err)

// This allocates a new slice, so make sure to do it as few times as possible.
Expand Down
64 changes: 61 additions & 3 deletions otelriver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package otelriver
import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -227,20 +229,34 @@ func TestMiddleware(t *testing.T) {
return nil
}

var (
createdAt = time.Now()
scheduledAt = time.Now().Add(1 * time.Second)
)
err := middleware.Work(ctx, &rivertype.JobRow{
Attempt: 6,
Kind: "no_op",
Queue: "my_queue",
ID: 123,
Attempt: 6,
CreatedAt: createdAt,
Kind: "no_op",
Priority: 1,
Queue: "my_queue",
ScheduledAt: scheduledAt,
Tags: []string{"a", "b"},
}, doInner)
require.NoError(t, err)

spans := bundle.traceExporter.GetSpans()
require.Len(t, spans, 1)

span := spans[0]
require.Equal(t, int64(123), getAttribute(t, span.Attributes, "id").AsInt64())
require.Equal(t, int64(6), getAttribute(t, span.Attributes, "attempt").AsInt64())
require.Equal(t, createdAt.Format(time.RFC3339), getAttribute(t, span.Attributes, "created_at").AsString())
require.Equal(t, "my_queue", getAttribute(t, span.Attributes, "queue").AsString())
require.Equal(t, int64(1), getAttribute(t, span.Attributes, "priority").AsInt64())
require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
require.Equal(t, scheduledAt.Format(time.RFC3339), getAttribute(t, span.Attributes, "scheduled_at").AsString())
require.Equal(t, []string{"a", "b"}, getAttribute(t, span.Attributes, "tag").AsStringSlice())
require.Equal(t, "river.work", span.Name)
require.Equal(t, codes.Ok, span.Status.Code)

Expand Down Expand Up @@ -301,6 +317,48 @@ func TestMiddleware(t *testing.T) {
requireHistogramCount(t, metrics, "river.work_duration_histogram", 1, expectedAttrs...)
})

t.Run("JobCancelError", func(t *testing.T) {
t.Parallel()

middleware, bundle := setup(t)

doInner := func(ctx context.Context) error {
return fmt.Errorf("wrapped job cancel: %w", rivertype.JobCancel(errors.New("inner error")))
}

err := middleware.Work(ctx, &rivertype.JobRow{
Kind: "no_op",
}, doInner)
require.EqualError(t, err, "wrapped job cancel: JobCancelError: inner error")

spans := bundle.traceExporter.GetSpans()
require.Len(t, spans, 1)

span := spans[0]
require.True(t, getAttribute(t, span.Attributes, "cancel").AsBool())
})

t.Run("JobSnoozeError", func(t *testing.T) {
t.Parallel()

middleware, bundle := setup(t)

doInner := func(ctx context.Context) error {
return fmt.Errorf("wrapped job snooze: %w", &rivertype.JobSnoozeError{})
}

err := middleware.Work(ctx, &rivertype.JobRow{
Kind: "no_op",
}, doInner)
require.EqualError(t, err, "wrapped job snooze: JobSnoozeError: 0s")

spans := bundle.traceExporter.GetSpans()
require.Len(t, spans, 1)

span := spans[0]
require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
})

t.Run("WorkPanic", func(t *testing.T) {
t.Parallel()

Expand Down
Loading