Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions otelriver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
defer func() {
duration := m.durationInPreferredUnit(time.Since(begin))

setAttributeAndSpanStatus(attrs, statusIndex, span, panicked, err)
setStatus(attrs, statusIndex, span, panicked, err)
span.SetAttributes(attrs...) // set after finalizing status

// This allocates a new slice, so make sure to do it as few times as possible.
measurementOpt := metric.WithAttributes(attrs...)
Expand All @@ -148,17 +149,14 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
defer span.End()

attrs := []attribute.KeyValue{
attribute.Int64("id", job.ID),
attribute.Int("attempt", job.Attempt),
attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)),
attribute.String("kind", job.Kind),
attribute.Int("priority", job.Priority),
attribute.String("queue", job.Queue),
attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)),
attribute.String("status", ""), // replaced below
attribute.StringSlice("tag", job.Tags),
}
const statusIndex = 7
const statusIndex = 4

var (
begin = time.Now()
Expand All @@ -181,7 +179,19 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
}
}

setAttributeAndSpanStatus(attrs, statusIndex, span, panicked, err)
setStatus(attrs, statusIndex, span, panicked, err)

{
// Add some higher cardinality attributes to spans, but keep them
// out of metrics given it's been traditional wisdom that metric
// attribute sets shouldn't be too large.
attrs := append(attrs,
attribute.Int64("id", job.ID),
attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)),
attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)),
)
span.SetAttributes(attrs...) // set after finalizing status
}

// This allocates a new slice, so make sure to do it as few times as possible.
measurementOpt := metric.WithAttributes(attrs...)
Expand Down Expand Up @@ -234,7 +244,7 @@ func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64Co
// Sets success status on the given span and within the set of attributes. The
// index of the status attribute is required ahead of time as a minor
// optimization.
func setAttributeAndSpanStatus(attrs []attribute.KeyValue, statusIndex int, span trace.Span, panicked bool, err error) {
func setStatus(attrs []attribute.KeyValue, statusIndex int, span trace.Span, panicked bool, err error) {
if attrs[statusIndex].Key != "status" {
panic("status attribute not at expected index; bug?") // protect against future regression
}
Expand All @@ -250,5 +260,4 @@ func setAttributeAndSpanStatus(attrs []attribute.KeyValue, statusIndex int, span
attrs[statusIndex] = attribute.String("status", "ok")
span.SetStatus(codes.Ok, "")
}
span.SetAttributes(attrs...) // set after finalizing status
}
Loading