diff --git a/otelriver/middleware.go b/otelriver/middleware.go index caadd1f..bc5c3d3 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -3,6 +3,7 @@ package otelriver import ( "cmp" "context" + "errors" "time" "go.opentelemetry.io/otel" @@ -107,7 +108,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware { } func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) { - ctx, span := m.tracer.Start(ctx, prefix+"insert_many") + ctx, span := m.tracer.Start(ctx, prefix+"insert_many", + trace.WithSpanKind(trace.SpanKindProducer)) defer span.End() attrs := []attribute.KeyValue{ @@ -141,16 +143,22 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job } func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { - ctx, span := m.tracer.Start(ctx, prefix+"work") + ctx, span := m.tracer.Start(ctx, prefix+"work", + trace.WithSpanKind(trace.SpanKindConsumer)) defer span.End() attrs := []attribute.KeyValue{ + attribute.Int64("id", job.ID), attribute.Int("attempt", job.Attempt), + attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)), attribute.String("kind", job.Kind), + attribute.Int("priority", job.Priority), attribute.String("queue", job.Queue), + attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)), attribute.String("status", ""), // replaced below + attribute.StringSlice("tag", job.Tags), } - const statusIndex = 3 + const statusIndex = 7 var ( begin = time.Now() @@ -160,6 +168,19 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu defer func() { duration := m.durationInPreferredUnit(time.Since(begin)) + if err != nil { + var ( + cancelErr *river.JobCancelError + snoozeErr *river.JobSnoozeError + ) + switch { + case errors.As(err, &cancelErr): + attrs = append(attrs, attribute.Bool("cancel", true)) + case errors.As(err, &snoozeErr): + attrs = append(attrs, attribute.Bool("snooze", true)) + } + } + setAttributeAndSpanStatus(attrs, statusIndex, span, panicked, err) // This allocates a new slice, so make sure to do it as few times as possible. diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index ead2959..fc12f38 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -3,7 +3,9 @@ package otelriver import ( "context" "errors" + "fmt" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" @@ -227,10 +229,19 @@ func TestMiddleware(t *testing.T) { return nil } + var ( + createdAt = time.Now() + scheduledAt = time.Now().Add(1 * time.Second) + ) err := middleware.Work(ctx, &rivertype.JobRow{ - Attempt: 6, - Kind: "no_op", - Queue: "my_queue", + ID: 123, + Attempt: 6, + CreatedAt: createdAt, + Kind: "no_op", + Priority: 1, + Queue: "my_queue", + ScheduledAt: scheduledAt, + Tags: []string{"a", "b"}, }, doInner) require.NoError(t, err) @@ -238,9 +249,14 @@ func TestMiddleware(t *testing.T) { require.Len(t, spans, 1) span := spans[0] + require.Equal(t, int64(123), getAttribute(t, span.Attributes, "id").AsInt64()) require.Equal(t, int64(6), getAttribute(t, span.Attributes, "attempt").AsInt64()) + require.Equal(t, createdAt.Format(time.RFC3339), getAttribute(t, span.Attributes, "created_at").AsString()) require.Equal(t, "my_queue", getAttribute(t, span.Attributes, "queue").AsString()) + require.Equal(t, int64(1), getAttribute(t, span.Attributes, "priority").AsInt64()) require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString()) + require.Equal(t, scheduledAt.Format(time.RFC3339), getAttribute(t, span.Attributes, "scheduled_at").AsString()) + require.Equal(t, []string{"a", "b"}, getAttribute(t, span.Attributes, "tag").AsStringSlice()) require.Equal(t, "river.work", span.Name) require.Equal(t, codes.Ok, span.Status.Code) @@ -301,6 +317,48 @@ func TestMiddleware(t *testing.T) { requireHistogramCount(t, metrics, "river.work_duration_histogram", 1, expectedAttrs...) }) + t.Run("JobCancelError", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) error { + return fmt.Errorf("wrapped job cancel: %w", rivertype.JobCancel(errors.New("inner error"))) + } + + err := middleware.Work(ctx, &rivertype.JobRow{ + Kind: "no_op", + }, doInner) + require.EqualError(t, err, "wrapped job cancel: JobCancelError: inner error") + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + + span := spans[0] + require.True(t, getAttribute(t, span.Attributes, "cancel").AsBool()) + }) + + t.Run("JobSnoozeError", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) error { + return fmt.Errorf("wrapped job snooze: %w", &rivertype.JobSnoozeError{}) + } + + err := middleware.Work(ctx, &rivertype.JobRow{ + Kind: "no_op", + }, doInner) + require.EqualError(t, err, "wrapped job snooze: JobSnoozeError: 0s") + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + + span := spans[0] + require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool()) + }) + t.Run("WorkPanic", func(t *testing.T) { t.Parallel()