diff --git a/instrumentation/opentelemetry/batchspanprocessor/README.md b/instrumentation/opentelemetry/batchspanprocessor/README.md new file mode 100644 index 00000000..e7ae6fb1 --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/README.md @@ -0,0 +1,11 @@ +# Modified Batch Span Processor + +Since the original BatchSpanProcessor does not send metrics for spans received and spans dropped, the modified BatchSpanProcessor creates and populates those metrics so the user can track whether there are spans being dropped. + +We have kept track of the original files modified so it's easier to figure out the changes we added. When upgrading, copy the newer files into their go.original counterparts and then do a diff with the modified.go files to figure out what changes to make in order to upgrade the modified files. + +The paths of the files modified: +- [sdk/trace/batch_span_processor.go](https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/trace/batch_span_processor.go) +- [sdk/internal/env/env.go](https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/internal/env/env.go) + +Since we cannot use [the internal logger]((https://github.com/open-telemetry/opentelemetry-go/blob/main/internal/global/internal_logging.go)), we have adapted it at [logger.go](instrumentation/opentelemetry/batchspanprocessor/logger.go). diff --git a/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.go.original b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.go.original new file mode 100644 index 00000000..a2d7db49 --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.go.original @@ -0,0 +1,432 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace // import "go.opentelemetry.io/otel/sdk/trace" + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/internal/env" + "go.opentelemetry.io/otel/trace" +) + +// Defaults for BatchSpanProcessorOptions. +const ( + DefaultMaxQueueSize = 2048 + DefaultScheduleDelay = 5000 + DefaultExportTimeout = 30000 + DefaultMaxExportBatchSize = 512 +) + +// BatchSpanProcessorOption configures a BatchSpanProcessor. +type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) + +// BatchSpanProcessorOptions is configuration settings for a +// BatchSpanProcessor. +type BatchSpanProcessorOptions struct { + // MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the + // queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior. + // The default value of MaxQueueSize is 2048. + MaxQueueSize int + + // BatchTimeout is the maximum duration for constructing a batch. Processor + // forcefully sends available spans when timeout is reached. + // The default value of BatchTimeout is 5000 msec. + BatchTimeout time.Duration + + // ExportTimeout specifies the maximum duration for exporting spans. If the timeout + // is reached, the export will be cancelled. + // The default value of ExportTimeout is 30000 msec. + ExportTimeout time.Duration + + // MaxExportBatchSize is the maximum number of spans to process in a single batch. + // If there are more than one batch worth of spans then it processes multiple batches + // of spans one batch after the other without any delay. + // The default value of MaxExportBatchSize is 512. + MaxExportBatchSize int + + // BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full + // AND if BlockOnQueueFull is set to true. + // Blocking option should be used carefully as it can severely affect the performance of an + // application. + BlockOnQueueFull bool +} + +// batchSpanProcessor is a SpanProcessor that batches asynchronously-received +// spans and sends them to a trace.Exporter when complete. +type batchSpanProcessor struct { + e SpanExporter + o BatchSpanProcessorOptions + + queue chan ReadOnlySpan + dropped uint32 + + batch []ReadOnlySpan + batchMutex sync.Mutex + timer *time.Timer + stopWait sync.WaitGroup + stopOnce sync.Once + stopCh chan struct{} +} + +var _ SpanProcessor = (*batchSpanProcessor)(nil) + +// NewBatchSpanProcessor creates a new SpanProcessor that will send completed +// span batches to the exporter with the supplied options. +// +// If the exporter is nil, the span processor will preform no action. +func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { + maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize) + maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize) + + if maxExportBatchSize > maxQueueSize { + if DefaultMaxExportBatchSize > maxQueueSize { + maxExportBatchSize = maxQueueSize + } else { + maxExportBatchSize = DefaultMaxExportBatchSize + } + } + + o := BatchSpanProcessorOptions{ + BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond, + ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond, + MaxQueueSize: maxQueueSize, + MaxExportBatchSize: maxExportBatchSize, + } + for _, opt := range options { + opt(&o) + } + bsp := &batchSpanProcessor{ + e: exporter, + o: o, + batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize), + timer: time.NewTimer(o.BatchTimeout), + queue: make(chan ReadOnlySpan, o.MaxQueueSize), + stopCh: make(chan struct{}), + } + + bsp.stopWait.Add(1) + go func() { + defer bsp.stopWait.Done() + bsp.processQueue() + bsp.drainQueue() + }() + + return bsp +} + +// OnStart method does nothing. +func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {} + +// OnEnd method enqueues a ReadOnlySpan for later processing. +func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) { + // Do not enqueue spans if we are just going to drop them. + if bsp.e == nil { + return + } + bsp.enqueue(s) +} + +// Shutdown flushes the queue and waits until all spans are processed. +// It only executes once. Subsequent call does nothing. +func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { + var err error + bsp.stopOnce.Do(func() { + wait := make(chan struct{}) + go func() { + close(bsp.stopCh) + bsp.stopWait.Wait() + if bsp.e != nil { + if err := bsp.e.Shutdown(ctx); err != nil { + otel.Handle(err) + } + } + close(wait) + }() + // Wait until the wait group is done or the context is cancelled + select { + case <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + }) + return err +} + +type forceFlushSpan struct { + ReadOnlySpan + flushed chan struct{} +} + +func (f forceFlushSpan) SpanContext() trace.SpanContext { + return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) +} + +// ForceFlush exports all ended spans that have not yet been exported. +func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { + var err error + if bsp.e != nil { + flushCh := make(chan struct{}) + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { + select { + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called + case <-ctx.Done(): + return ctx.Err() + } + } + + wait := make(chan error) + go func() { + wait <- bsp.exportSpans(ctx) + close(wait) + }() + // Wait until the export is finished or the context is cancelled/timed out + select { + case err = <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + } + return err +} + +// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the +// maximum queue size allowed for a BatchSpanProcessor. +func WithMaxQueueSize(size int) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.MaxQueueSize = size + } +} + +// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures +// the maximum export batch size allowed for a BatchSpanProcessor. +func WithMaxExportBatchSize(size int) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.MaxExportBatchSize = size + } +} + +// WithBatchTimeout returns a BatchSpanProcessorOption that configures the +// maximum delay allowed for a BatchSpanProcessor before it will export any +// held span (whether the queue is full or not). +func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.BatchTimeout = delay + } +} + +// WithExportTimeout returns a BatchSpanProcessorOption that configures the +// amount of time a BatchSpanProcessor waits for an exporter to export before +// abandoning the export. +func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.ExportTimeout = timeout + } +} + +// WithBlocking returns a BatchSpanProcessorOption that configures a +// BatchSpanProcessor to wait for enqueue operations to succeed instead of +// dropping data when the queue is full. +func WithBlocking() BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.BlockOnQueueFull = true + } +} + +// exportSpans is a subroutine of processing and draining the queue. +func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { + bsp.timer.Reset(bsp.o.BatchTimeout) + + bsp.batchMutex.Lock() + defer bsp.batchMutex.Unlock() + + if bsp.o.ExportTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) + defer cancel() + } + + if l := len(bsp.batch); l > 0 { + global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + err := bsp.e.ExportSpans(ctx, bsp.batch) + + // A new batch is always created after exporting, even if the batch failed to be exported. + // + // It is up to the exporter to implement any type of retry logic if a batch is failing + // to be exported, since it is specific to the protocol and backend being sent to. + bsp.batch = bsp.batch[:0] + + if err != nil { + return err + } + } + return nil +} + +// processQueue removes spans from the `queue` channel until processor +// is shut down. It calls the exporter in batches of up to MaxExportBatchSize +// waiting up to BatchTimeout to form a batch. +func (bsp *batchSpanProcessor) processQueue() { + defer bsp.timer.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case <-bsp.stopCh: + return + case <-bsp.timer.C: + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + case sd := <-bsp.queue: + if ffs, ok := sd.(forceFlushSpan); ok { + close(ffs.flushed) + continue + } + bsp.batchMutex.Lock() + bsp.batch = append(bsp.batch, sd) + shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize + bsp.batchMutex.Unlock() + if shouldExport { + if !bsp.timer.Stop() { + <-bsp.timer.C + } + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + } + } + } +} + +// drainQueue awaits the any caller that had added to bsp.stopWait +// to finish the enqueue, then exports the final batch. +func (bsp *batchSpanProcessor) drainQueue() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case sd := <-bsp.queue: + if sd == nil { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + return + } + + bsp.batchMutex.Lock() + bsp.batch = append(bsp.batch, sd) + shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize + bsp.batchMutex.Unlock() + + if shouldExport { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + } + default: + close(bsp.queue) + } + } +} + +func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { + ctx := context.TODO() + if bsp.o.BlockOnQueueFull { + bsp.enqueueBlockOnQueueFull(ctx, sd) + } else { + bsp.enqueueDrop(ctx, sd) + } +} + +func recoverSendOnClosedChan() { + x := recover() + switch err := x.(type) { + case nil: + return + case runtime.Error: + if err.Error() == "send on closed channel" { + return + } + } + panic(x) +} + +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool { + if !sd.SpanContext().IsSampled() { + return false + } + + // This ensures the bsp.queue<- below does not panic as the + // processor shuts down. + defer recoverSendOnClosedChan() + + select { + case <-bsp.stopCh: + return false + default: + } + + select { + case bsp.queue <- sd: + return true + case <-ctx.Done(): + return false + } +} + +func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool { + if !sd.SpanContext().IsSampled() { + return false + } + + // This ensures the bsp.queue<- below does not panic as the + // processor shuts down. + defer recoverSendOnClosedChan() + + select { + case <-bsp.stopCh: + return false + default: + } + + select { + case bsp.queue <- sd: + return true + default: + atomic.AddUint32(&bsp.dropped, 1) + } + return false +} + +// MarshalLog is the marshaling function used by the logging system to represent this exporter. +func (bsp *batchSpanProcessor) MarshalLog() interface{} { + return struct { + Type string + SpanExporter SpanExporter + Config BatchSpanProcessorOptions + }{ + Type: "BatchSpanProcessor", + SpanExporter: bsp.e, + Config: bsp.o, + } +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go new file mode 100644 index 00000000..7590b460 --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go @@ -0,0 +1,442 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchspanprocessor // import "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + metricglobal "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" +) + +// Defaults for BatchSpanProcessorOptions. +const ( + DefaultMaxQueueSize = 2048 + DefaultScheduleDelay = 5000 + DefaultExportTimeout = 30000 + DefaultMaxExportBatchSize = 512 + SpansReceivedCounter = "hypertrace.agent.bsp.spans_received" + SpansDroppedCounter = "hypertrace.agent.bsp.spans_dropped" + SpansUnSampledCounter = "hypertrace.agent.bsp.spans_unsampled" +) + +// batchSpanProcessor is a SpanProcessor that batches asynchronously-received +// spans and sends them to a trace.Exporter when complete. +type batchSpanProcessor struct { + e sdktrace.SpanExporter + o sdktrace.BatchSpanProcessorOptions + + queue chan sdktrace.ReadOnlySpan + dropped uint32 + + batch []sdktrace.ReadOnlySpan + batchMutex sync.Mutex + timer *time.Timer + stopWait sync.WaitGroup + stopOnce sync.Once + stopCh chan struct{} + // Some metrics in here. + counters map[string]syncint64.Counter +} + +var _ sdktrace.SpanProcessor = (*batchSpanProcessor)(nil) + +// NewBatchSpanProcessor creates a new SpanProcessor that will send completed +// span batches to the exporter with the supplied options. +// +// If the exporter is nil, the span processor will preform no action. +func NewBatchSpanProcessor(exporter sdktrace.SpanExporter, options ...sdktrace.BatchSpanProcessorOption) sdktrace.SpanProcessor { + maxQueueSize := BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize) + maxExportBatchSize := BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize) + + if maxExportBatchSize > maxQueueSize { + if DefaultMaxExportBatchSize > maxQueueSize { + maxExportBatchSize = maxQueueSize + } else { + maxExportBatchSize = DefaultMaxExportBatchSize + } + } + + o := sdktrace.BatchSpanProcessorOptions{ + BatchTimeout: time.Duration(BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond, + ExportTimeout: time.Duration(BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond, + MaxQueueSize: maxQueueSize, + MaxExportBatchSize: maxExportBatchSize, + } + for _, opt := range options { + opt(&o) + } + + // Setup metrics + mp := metricglobal.MeterProvider() + meter := mp.Meter("go.opentelemetry.io/otel/sdk/trace", + metric.WithInstrumentationVersion(otel.Version())) + counters := make(map[string]syncint64.Counter) + + // Spans received by processor + spansReceivedCounter, err := meter.SyncInt64().Counter(SpansReceivedCounter) + if err != nil { + otel.Handle(err) + } + counters[SpansReceivedCounter] = spansReceivedCounter + + // Spans Dropped by processor once the buffer is full. + spansDroppedCounter, err := meter.SyncInt64().Counter(SpansDroppedCounter) + if err != nil { + otel.Handle(err) + } + counters[SpansDroppedCounter] = spansDroppedCounter + + // Spans that are not sampled.(Useful to know when sampling is enabled) + spansUnSampledCounter, err := meter.SyncInt64().Counter(SpansUnSampledCounter) + if err != nil { + otel.Handle(err) + } + counters[SpansUnSampledCounter] = spansUnSampledCounter + + bsp := &batchSpanProcessor{ + e: exporter, + o: o, + batch: make([]sdktrace.ReadOnlySpan, 0, o.MaxExportBatchSize), + timer: time.NewTimer(o.BatchTimeout), + queue: make(chan sdktrace.ReadOnlySpan, o.MaxQueueSize), + stopCh: make(chan struct{}), + counters: counters, + } + + bsp.stopWait.Add(1) + go func() { + defer bsp.stopWait.Done() + bsp.processQueue() + bsp.drainQueue() + }() + + return bsp +} + +// OnStart method does nothing. +func (bsp *batchSpanProcessor) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) {} + +// OnEnd method enqueues a ReadOnlySpan for later processing. +func (bsp *batchSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) { + // Do not enqueue spans if we are just going to drop them. + if bsp.e == nil { + return + } + bsp.enqueue(s) +} + +// Shutdown flushes the queue and waits until all spans are processed. +// It only executes once. Subsequent call does nothing. +func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { + var err error + bsp.stopOnce.Do(func() { + wait := make(chan struct{}) + go func() { + close(bsp.stopCh) + bsp.stopWait.Wait() + if bsp.e != nil { + if err := bsp.e.Shutdown(ctx); err != nil { + otel.Handle(err) + } + } + close(wait) + }() + // Wait until the wait group is done or the context is cancelled + select { + case <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + }) + return err +} + +type forceFlushSpan struct { + sdktrace.ReadOnlySpan + flushed chan struct{} +} + +func (f forceFlushSpan) SpanContext() trace.SpanContext { + return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) +} + +// ForceFlush exports all ended spans that have not yet been exported. +func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { + var err error + if bsp.e != nil { + flushCh := make(chan struct{}) + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { + select { + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called + case <-ctx.Done(): + return ctx.Err() + } + } + + wait := make(chan error) + go func() { + wait <- bsp.exportSpans(ctx) + close(wait) + }() + // Wait until the export is finished or the context is cancelled/timed out + select { + case err = <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + } + return err +} + +// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the +// maximum queue size allowed for a BatchSpanProcessor. +func WithMaxQueueSize(size int) sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.MaxQueueSize = size + } +} + +// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures +// the maximum export batch size allowed for a BatchSpanProcessor. +func WithMaxExportBatchSize(size int) sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.MaxExportBatchSize = size + } +} + +// WithBatchTimeout returns a BatchSpanProcessorOption that configures the +// maximum delay allowed for a BatchSpanProcessor before it will export any +// held span (whether the queue is full or not). +func WithBatchTimeout(delay time.Duration) sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.BatchTimeout = delay + } +} + +// WithExportTimeout returns a BatchSpanProcessorOption that configures the +// amount of time a BatchSpanProcessor waits for an exporter to export before +// abandoning the export. +func WithExportTimeout(timeout time.Duration) sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.ExportTimeout = timeout + } +} + +// WithBlocking returns a BatchSpanProcessorOption that configures a +// BatchSpanProcessor to wait for enqueue operations to succeed instead of +// dropping data when the queue is full. +func WithBlocking() sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.BlockOnQueueFull = true + } +} + +// exportSpans is a subroutine of processing and draining the queue. +func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { + bsp.timer.Reset(bsp.o.BatchTimeout) + + bsp.batchMutex.Lock() + defer bsp.batchMutex.Unlock() + + if bsp.o.ExportTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) + defer cancel() + } + + if l := len(bsp.batch); l > 0 { + Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + err := bsp.e.ExportSpans(ctx, bsp.batch) + + // A new batch is always created after exporting, even if the batch failed to be exported. + // + // It is up to the exporter to implement any type of retry logic if a batch is failing + // to be exported, since it is specific to the protocol and backend being sent to. + bsp.batch = bsp.batch[:0] + + if err != nil { + return err + } + } + return nil +} + +// processQueue removes spans from the `queue` channel until processor +// is shut down. It calls the exporter in batches of up to MaxExportBatchSize +// waiting up to BatchTimeout to form a batch. +func (bsp *batchSpanProcessor) processQueue() { + defer bsp.timer.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case <-bsp.stopCh: + return + case <-bsp.timer.C: + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + case sd := <-bsp.queue: + if ffs, ok := sd.(forceFlushSpan); ok { + close(ffs.flushed) + continue + } + bsp.batchMutex.Lock() + bsp.batch = append(bsp.batch, sd) + shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize + bsp.batchMutex.Unlock() + if shouldExport { + if !bsp.timer.Stop() { + <-bsp.timer.C + } + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + } + } + } +} + +// drainQueue awaits the any caller that had added to bsp.stopWait +// to finish the enqueue, then exports the final batch. +func (bsp *batchSpanProcessor) drainQueue() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case sd := <-bsp.queue: + if sd == nil { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + return + } + + bsp.batchMutex.Lock() + bsp.batch = append(bsp.batch, sd) + shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize + bsp.batchMutex.Unlock() + + if shouldExport { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + } + default: + close(bsp.queue) + } + } +} + +func (bsp *batchSpanProcessor) enqueue(sd sdktrace.ReadOnlySpan) { + ctx := context.TODO() + if bsp.o.BlockOnQueueFull { + bsp.enqueueBlockOnQueueFull(ctx, sd) + } else { + bsp.enqueueDrop(ctx, sd) + } +} + +func recoverSendOnClosedChan() { + x := recover() + switch err := x.(type) { + case nil: + return + case runtime.Error: + if err.Error() == "send on closed channel" { + return + } + } + panic(x) +} + +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd sdktrace.ReadOnlySpan) bool { + if !sd.SpanContext().IsSampled() { + return false + } + + // This ensures the bsp.queue<- below does not panic as the + // processor shuts down. + defer recoverSendOnClosedChan() + + select { + case <-bsp.stopCh: + return false + default: + } + + select { + case bsp.queue <- sd: + return true + case <-ctx.Done(): + return false + } +} + +func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd sdktrace.ReadOnlySpan) bool { + if !sd.SpanContext().IsSampled() { + // Count the span as unsampled + bsp.counters[SpansUnSampledCounter].Add(ctx, 1) + return false + } + + // Count the span as received. + bsp.counters[SpansReceivedCounter].Add(ctx, 1) + + // This ensures the bsp.queue<- below does not panic as the + // processor shuts down. + defer recoverSendOnClosedChan() + + select { + case <-bsp.stopCh: + return false + default: + } + + select { + case bsp.queue <- sd: + return true + default: + atomic.AddUint32(&bsp.dropped, 1) + // Count the span as dropped. + bsp.counters[SpansDroppedCounter].Add(ctx, 1) + } + return false +} + +// MarshalLog is the marshaling function used by the logging system to represent this exporter. +func (bsp *batchSpanProcessor) MarshalLog() interface{} { + return struct { + Type string + SpanExporter sdktrace.SpanExporter + Config sdktrace.BatchSpanProcessorOptions + }{ + Type: "BatchSpanProcessor", + SpanExporter: bsp.e, + Config: bsp.o, + } +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go b/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go new file mode 100644 index 00000000..91af2984 --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go @@ -0,0 +1,13 @@ +package batchspanprocessor + +import ( + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func CreateBatchSpanProcessor(useModified bool, exporter sdktrace.SpanExporter, + options ...sdktrace.BatchSpanProcessorOption) sdktrace.SpanProcessor { + if useModified { + return NewBatchSpanProcessor(exporter, options...) + } + return sdktrace.NewBatchSpanProcessor(exporter, options...) +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/bspcreator_test.go b/instrumentation/opentelemetry/batchspanprocessor/bspcreator_test.go new file mode 100644 index 00000000..1dda510c --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/bspcreator_test.go @@ -0,0 +1,25 @@ +package batchspanprocessor + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +var batchTimeout = time.Duration(200) * time.Millisecond + +func TestCreateBsp(t *testing.T) { + exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint()) + require.NoError(t, err) + bsp := CreateBatchSpanProcessor(true, exporter, + sdktrace.WithBatchTimeout(batchTimeout)) + assert.NotNil(t, bsp) + + bsp = CreateBatchSpanProcessor(false, exporter, + sdktrace.WithBatchTimeout(batchTimeout)) + assert.NotNil(t, bsp) +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/env.go.original b/instrumentation/opentelemetry/batchspanprocessor/env.go.original new file mode 100644 index 00000000..5e94b8ae --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/env.go.original @@ -0,0 +1,177 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package env // import "go.opentelemetry.io/otel/sdk/internal/env" + +import ( + "os" + "strconv" + + "go.opentelemetry.io/otel/internal/global" +) + +// Environment variable names. +const ( + // BatchSpanProcessorScheduleDelayKey is the delay interval between two + // consecutive exports (i.e. 5000). + BatchSpanProcessorScheduleDelayKey = "OTEL_BSP_SCHEDULE_DELAY" + // BatchSpanProcessorExportTimeoutKey is the maximum allowed time to + // export data (i.e. 3000). + BatchSpanProcessorExportTimeoutKey = "OTEL_BSP_EXPORT_TIMEOUT" + // BatchSpanProcessorMaxQueueSizeKey is the maximum queue size (i.e. 2048). + BatchSpanProcessorMaxQueueSizeKey = "OTEL_BSP_MAX_QUEUE_SIZE" + // BatchSpanProcessorMaxExportBatchSizeKey is the maximum batch size (i.e. + // 512). Note: it must be less than or equal to + // EnvBatchSpanProcessorMaxQueueSize. + BatchSpanProcessorMaxExportBatchSizeKey = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE" + + // AttributeValueLengthKey is the maximum allowed attribute value size. + AttributeValueLengthKey = "OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT" + + // AttributeCountKey is the maximum allowed span attribute count. + AttributeCountKey = "OTEL_ATTRIBUTE_COUNT_LIMIT" + + // SpanAttributeValueLengthKey is the maximum allowed attribute value size + // for a span. + SpanAttributeValueLengthKey = "OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT" + + // SpanAttributeCountKey is the maximum allowed span attribute count for a + // span. + SpanAttributeCountKey = "OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT" + + // SpanEventCountKey is the maximum allowed span event count. + SpanEventCountKey = "OTEL_SPAN_EVENT_COUNT_LIMIT" + + // SpanEventAttributeCountKey is the maximum allowed attribute per span + // event count. + SpanEventAttributeCountKey = "OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT" + + // SpanLinkCountKey is the maximum allowed span link count. + SpanLinkCountKey = "OTEL_SPAN_LINK_COUNT_LIMIT" + + // SpanLinkAttributeCountKey is the maximum allowed attribute per span + // link count. + SpanLinkAttributeCountKey = "OTEL_LINK_ATTRIBUTE_COUNT_LIMIT" +) + +// firstInt returns the value of the first matching environment variable from +// keys. If the value is not an integer or no match is found, defaultValue is +// returned. +func firstInt(defaultValue int, keys ...string) int { + for _, key := range keys { + value, ok := os.LookupEnv(key) + if !ok { + continue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + global.Info("Got invalid value, number value expected.", key, value) + return defaultValue + } + + return intValue + } + + return defaultValue +} + +// IntEnvOr returns the int value of the environment variable with name key if +// it exists and the value is an int. Otherwise, defaultValue is returned. +func IntEnvOr(key string, defaultValue int) int { + value, ok := os.LookupEnv(key) + if !ok { + return defaultValue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + global.Info("Got invalid value, number value expected.", key, value) + return defaultValue + } + + return intValue +} + +// BatchSpanProcessorScheduleDelay returns the environment variable value for +// the OTEL_BSP_SCHEDULE_DELAY key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorScheduleDelay(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorScheduleDelayKey, defaultValue) +} + +// BatchSpanProcessorExportTimeout returns the environment variable value for +// the OTEL_BSP_EXPORT_TIMEOUT key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorExportTimeout(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorExportTimeoutKey, defaultValue) +} + +// BatchSpanProcessorMaxQueueSize returns the environment variable value for +// the OTEL_BSP_MAX_QUEUE_SIZE key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorMaxQueueSize(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorMaxQueueSizeKey, defaultValue) +} + +// BatchSpanProcessorMaxExportBatchSize returns the environment variable value for +// the OTEL_BSP_MAX_EXPORT_BATCH_SIZE key if it exists, otherwise defaultValue +// is returned. +func BatchSpanProcessorMaxExportBatchSize(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorMaxExportBatchSizeKey, defaultValue) +} + +// SpanAttributeValueLength returns the environment variable value for the +// OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT key if it exists. Otherwise, the +// environment variable value for OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT is +// returned or defaultValue if that is not set. +func SpanAttributeValueLength(defaultValue int) int { + return firstInt(defaultValue, SpanAttributeValueLengthKey, AttributeValueLengthKey) +} + +// SpanAttributeCount returns the environment variable value for the +// OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT key if it exists. Otherwise, the +// environment variable value for OTEL_ATTRIBUTE_COUNT_LIMIT is returned or +// defaultValue if that is not set. +func SpanAttributeCount(defaultValue int) int { + return firstInt(defaultValue, SpanAttributeCountKey, AttributeCountKey) +} + +// SpanEventCount returns the environment variable value for the +// OTEL_SPAN_EVENT_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanEventCount(defaultValue int) int { + return IntEnvOr(SpanEventCountKey, defaultValue) +} + +// SpanEventAttributeCount returns the environment variable value for the +// OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT key if it exists, otherwise defaultValue +// is returned. +func SpanEventAttributeCount(defaultValue int) int { + return IntEnvOr(SpanEventAttributeCountKey, defaultValue) +} + +// SpanLinkCount returns the environment variable value for the +// OTEL_SPAN_LINK_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanLinkCount(defaultValue int) int { + return IntEnvOr(SpanLinkCountKey, defaultValue) +} + +// SpanLinkAttributeCount returns the environment variable value for the +// OTEL_LINK_ATTRIBUTE_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanLinkAttributeCount(defaultValue int) int { + return IntEnvOr(SpanLinkAttributeCountKey, defaultValue) +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/env.modified.go b/instrumentation/opentelemetry/batchspanprocessor/env.modified.go new file mode 100644 index 00000000..815184d2 --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/env.modified.go @@ -0,0 +1,175 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchspanprocessor // import "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" + +import ( + "os" + "strconv" +) + +// Environment variable names. +const ( + // BatchSpanProcessorScheduleDelayKey is the delay interval between two + // consecutive exports (i.e. 5000). + BatchSpanProcessorScheduleDelayKey = "OTEL_BSP_SCHEDULE_DELAY" + // BatchSpanProcessorExportTimeoutKey is the maximum allowed time to + // export data (i.e. 3000). + BatchSpanProcessorExportTimeoutKey = "OTEL_BSP_EXPORT_TIMEOUT" + // BatchSpanProcessorMaxQueueSizeKey is the maximum queue size (i.e. 2048). + BatchSpanProcessorMaxQueueSizeKey = "OTEL_BSP_MAX_QUEUE_SIZE" + // BatchSpanProcessorMaxExportBatchSizeKey is the maximum batch size (i.e. + // 512). Note: it must be less than or equal to + // EnvBatchSpanProcessorMaxQueueSize. + BatchSpanProcessorMaxExportBatchSizeKey = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE" + + // AttributeValueLengthKey is the maximum allowed attribute value size. + AttributeValueLengthKey = "OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT" + + // AttributeCountKey is the maximum allowed span attribute count. + AttributeCountKey = "OTEL_ATTRIBUTE_COUNT_LIMIT" + + // SpanAttributeValueLengthKey is the maximum allowed attribute value size + // for a span. + SpanAttributeValueLengthKey = "OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT" + + // SpanAttributeCountKey is the maximum allowed span attribute count for a + // span. + SpanAttributeCountKey = "OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT" + + // SpanEventCountKey is the maximum allowed span event count. + SpanEventCountKey = "OTEL_SPAN_EVENT_COUNT_LIMIT" + + // SpanEventAttributeCountKey is the maximum allowed attribute per span + // event count. + SpanEventAttributeCountKey = "OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT" + + // SpanLinkCountKey is the maximum allowed span link count. + SpanLinkCountKey = "OTEL_SPAN_LINK_COUNT_LIMIT" + + // SpanLinkAttributeCountKey is the maximum allowed attribute per span + // link count. + SpanLinkAttributeCountKey = "OTEL_LINK_ATTRIBUTE_COUNT_LIMIT" +) + +// firstInt returns the value of the first matching environment variable from +// keys. If the value is not an integer or no match is found, defaultValue is +// returned. +func firstInt(defaultValue int, keys ...string) int { + for _, key := range keys { + value, ok := os.LookupEnv(key) + if !ok { + continue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + Info("Got invalid value, number value expected.", key, value) + return defaultValue + } + + return intValue + } + + return defaultValue +} + +// IntEnvOr returns the int value of the environment variable with name key if +// it exists and the value is an int. Otherwise, defaultValue is returned. +func IntEnvOr(key string, defaultValue int) int { + value, ok := os.LookupEnv(key) + if !ok { + return defaultValue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + Info("Got invalid value, number value expected.", key, value) + return defaultValue + } + + return intValue +} + +// BatchSpanProcessorScheduleDelay returns the environment variable value for +// the OTEL_BSP_SCHEDULE_DELAY key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorScheduleDelay(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorScheduleDelayKey, defaultValue) +} + +// BatchSpanProcessorExportTimeout returns the environment variable value for +// the OTEL_BSP_EXPORT_TIMEOUT key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorExportTimeout(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorExportTimeoutKey, defaultValue) +} + +// BatchSpanProcessorMaxQueueSize returns the environment variable value for +// the OTEL_BSP_MAX_QUEUE_SIZE key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorMaxQueueSize(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorMaxQueueSizeKey, defaultValue) +} + +// BatchSpanProcessorMaxExportBatchSize returns the environment variable value for +// the OTEL_BSP_MAX_EXPORT_BATCH_SIZE key if it exists, otherwise defaultValue +// is returned. +func BatchSpanProcessorMaxExportBatchSize(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorMaxExportBatchSizeKey, defaultValue) +} + +// SpanAttributeValueLength returns the environment variable value for the +// OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT key if it exists. Otherwise, the +// environment variable value for OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT is +// returned or defaultValue if that is not set. +func SpanAttributeValueLength(defaultValue int) int { + return firstInt(defaultValue, SpanAttributeValueLengthKey, AttributeValueLengthKey) +} + +// SpanAttributeCount returns the environment variable value for the +// OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT key if it exists. Otherwise, the +// environment variable value for OTEL_ATTRIBUTE_COUNT_LIMIT is returned or +// defaultValue if that is not set. +func SpanAttributeCount(defaultValue int) int { + return firstInt(defaultValue, SpanAttributeCountKey, AttributeCountKey) +} + +// SpanEventCount returns the environment variable value for the +// OTEL_SPAN_EVENT_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanEventCount(defaultValue int) int { + return IntEnvOr(SpanEventCountKey, defaultValue) +} + +// SpanEventAttributeCount returns the environment variable value for the +// OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT key if it exists, otherwise defaultValue +// is returned. +func SpanEventAttributeCount(defaultValue int) int { + return IntEnvOr(SpanEventAttributeCountKey, defaultValue) +} + +// SpanLinkCount returns the environment variable value for the +// OTEL_SPAN_LINK_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanLinkCount(defaultValue int) int { + return IntEnvOr(SpanLinkCountKey, defaultValue) +} + +// SpanLinkAttributeCount returns the environment variable value for the +// OTEL_LINK_ATTRIBUTE_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanLinkAttributeCount(defaultValue int) int { + return IntEnvOr(SpanLinkAttributeCountKey, defaultValue) +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/logger.go b/instrumentation/opentelemetry/batchspanprocessor/logger.go new file mode 100644 index 00000000..22e8c42c --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/logger.go @@ -0,0 +1,30 @@ +package batchspanprocessor // import "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" + +// Adapted from go.opentelemetry.io/otel/internal/global#internal_logging.go +import ( + "log" + "os" + + "github.com/go-logr/logr" + "github.com/go-logr/stdr" +) + +// The logger uses stdr which is backed by the standard `log.Logger` +// interface. This logger will only show messages at the Error Level. +var logger logr.Logger = stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)) + +// Info prints messages about the general state of the API or SDK. +// This should usually be less then 5 messages a minute. +func Info(msg string, keysAndValues ...interface{}) { + logger.V(1).Info(msg, keysAndValues...) +} + +// Error prints messages about exceptional states of the API or SDK. +func Error(err error, msg string, keysAndValues ...interface{}) { + logger.Error(err, msg, keysAndValues...) +} + +// Debug prints messages about all internal changes in the API or SDK. +func Debug(msg string, keysAndValues ...interface{}) { + logger.V(5).Info(msg, keysAndValues...) +} diff --git a/instrumentation/opentelemetry/examples/init_additional_test.go b/instrumentation/opentelemetry/examples/init_additional_test.go index 5ed3e339..2c30791b 100644 --- a/instrumentation/opentelemetry/examples/init_additional_test.go +++ b/instrumentation/opentelemetry/examples/init_additional_test.go @@ -8,16 +8,16 @@ import ( "github.com/gorilla/mux" "github.com/hypertrace/goagent/config" hyperotel "github.com/hypertrace/goagent/instrumentation/opentelemetry" + modbsp "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" "github.com/hypertrace/goagent/instrumentation/opentelemetry/net/hyperhttp" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" ) -var otherSpanExporter trace.SpanExporter = nil +var otherSpanExporter sdktrace.SpanExporter = nil func ExampleInitAsAdditional() { hyperSpanProcessor, shutdown := hyperotel.InitAsAdditional(config.Load()) @@ -31,7 +31,8 @@ func ExampleInitAsAdditional() { ), ) - otherSpanProcessor := sdktrace.NewBatchSpanProcessor( + otherSpanProcessor := modbsp.CreateBatchSpanProcessor( + true, // use modified bsp hyperotel.RemoveGoAgentAttrs(otherSpanExporter), ) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index eeddebdf..e2ad2c4d 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -23,6 +23,7 @@ import ( config "github.com/hypertrace/agent-config/gen/go/v1" "go.opentelemetry.io/otel/attribute" + modbsp "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" "github.com/hypertrace/goagent/instrumentation/opentelemetry/internal/identifier" sdkconfig "github.com/hypertrace/goagent/sdk/config" "github.com/hypertrace/goagent/version" @@ -252,7 +253,10 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor log.Fatal(err) } - sp := sdktrace.NewBatchSpanProcessor(exporter, sdktrace.WithBatchTimeout(batchTimeout)) + sp := modbsp.CreateBatchSpanProcessor( + cfg.GetTelemetry() != nil && cfg.GetTelemetry().GetMetricsEnabled().GetValue(), // metrics enabled + exporter, + sdktrace.WithBatchTimeout(batchTimeout)) if wrapper != nil { sp = &spanProcessorWithWrapper{wrapper, sp} } @@ -358,7 +362,10 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu log.Fatal(err) } - sp := sdktrace.NewBatchSpanProcessor(exporter, sdktrace.WithBatchTimeout(batchTimeout)) + sp := modbsp.CreateBatchSpanProcessor( + true, // ideally there should be no issue with using the modified bsp + exporter, + sdktrace.WithBatchTimeout(batchTimeout)) if wrapper != nil { sp = &spanProcessorWithWrapper{wrapper, sp} } diff --git a/instrumentation/opentelemetry/init_additional.go b/instrumentation/opentelemetry/init_additional.go index 762b9a63..ed55b9b8 100644 --- a/instrumentation/opentelemetry/init_additional.go +++ b/instrumentation/opentelemetry/init_additional.go @@ -6,6 +6,7 @@ import ( "strings" config "github.com/hypertrace/agent-config/gen/go/v1" + modbsp "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" sdkconfig "github.com/hypertrace/goagent/sdk/config" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/resource" @@ -42,13 +43,17 @@ func InitAsAdditional(cfg *config.AgentConfig) (trace.SpanProcessor, func()) { exporter = addResourceToSpans(exporter, resource) } - return trace.NewBatchSpanProcessor(exporter, trace.WithBatchTimeout(batchTimeout)), func() { - err := exporter.Shutdown(context.Background()) - if err != nil { - log.Printf("error while shutting down exporter: %v\n", err) + return modbsp.CreateBatchSpanProcessor( + cfg.GetTelemetry() != nil && cfg.GetTelemetry().GetMetricsEnabled().GetValue(), // metrics enabled + exporter, + trace.WithBatchTimeout(batchTimeout)), + func() { + err := exporter.Shutdown(context.Background()) + if err != nil { + log.Printf("error while shutting down exporter: %v\n", err) + } + sdkconfig.ResetConfig() } - sdkconfig.ResetConfig() - } } type shieldResourceSpan struct { diff --git a/instrumentation/opentelemetry/init_test.go b/instrumentation/opentelemetry/init_test.go index bb9bc84b..3c419547 100644 --- a/instrumentation/opentelemetry/init_test.go +++ b/instrumentation/opentelemetry/init_test.go @@ -152,6 +152,8 @@ func TestMultipleTraceProviders(t *testing.T) { cfg.Reporting.Endpoint = config.String(srv.URL) cfg.Reporting.TraceReporterType = config.TraceReporterType_ZIPKIN cfg.Enabled = config.Bool(true) + // Disable metrics to only test trace provider. + cfg.Telemetry.MetricsEnabled = config.Bool(false) // By doing this we make sure a batching isn't happening batchTimeout = time.Duration(10) * time.Second @@ -178,10 +180,9 @@ func TestMultipleTraceProviders(t *testing.T) { assert.Equal(t, 0, count) }) - // 2 requests for spans and 1 for metrics. - t.Run("test 3 requests after flush", func(t *testing.T) { + t.Run("test 2 requests after flush", func(t *testing.T) { shutdown() - assert.Equal(t, 3, count) + assert.Equal(t, 2, count) assert.Equal(t, 0, len(traceProviders)) }) }