From 073ad15cf19327a0eacdbf647964e3c666de9f2a Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Tue, 24 Jan 2023 10:51:06 -0800 Subject: [PATCH 01/13] feat: otel go metrics setup Set up otel go metrics reporting and add a RequestCount metric for the http handler instrumentation. --- go.mod | 11 +++- go.sum | 11 ++++ .../hypertrace/net/hyperhttp/handler.go | 2 +- instrumentation/opentelemetry/init.go | 65 +++++++++++++++++++ sdk/instrumentation/net/http/handler.go | 40 +++++++++++- sdk/instrumentation/net/http/handler_test.go | 16 +++-- 6 files changed, 132 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 2e3194bb..e4333f9c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.17 require ( contrib.go.opencensus.io/exporter/zipkin v0.1.2 github.com/gin-gonic/gin v1.7.2 + github.com/go-logr/stdr v1.2.2 github.com/golang/protobuf v1.5.2 + github.com/google/uuid v1.1.2 github.com/gorilla/mux v1.8.0 github.com/hypertrace/agent-config/gen/go v0.0.0-20221206162312-4a295cabd009 github.com/json-iterator/go v1.1.11 // indirect @@ -23,13 +25,18 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.10.0 go.opentelemetry.io/otel/exporters/zipkin v1.10.0 + go.opentelemetry.io/otel/metric v0.31.0 go.opentelemetry.io/otel/sdk v1.10.0 go.opentelemetry.io/otel/trace v1.10.0 google.golang.org/grpc v1.49.0 google.golang.org/protobuf v1.28.1 ) -require github.com/google/uuid v1.1.2 +require ( + go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.31.0 + go.opentelemetry.io/otel/sdk/metric v0.31.0 +) require ( cloud.google.com/go v0.81.0 // indirect @@ -39,7 +46,6 @@ require ( github.com/ghodss/yaml v1.0.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-logr/logr v1.2.3 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect github.com/go-playground/validator/v10 v10.4.1 // indirect @@ -52,7 +58,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/ugorji/go/codec v1.1.7 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 // indirect - go.opentelemetry.io/otel/metric v0.31.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect golang.org/x/net v0.0.0-20211216030914-fe4d6282115f // indirect diff --git a/go.sum b/go.sum index e82e398e..22db73a0 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,8 @@ github.com/Shopify/sarama v1.30.0/go.mod h1:zujlQQx1kzHsh4jfV1USnptCQrHAEZ2Hk8fT github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae/go.mod h1:/cvHQkZ1fst0EmZnA5dFtiQdWCNCFYzb+uE2vqVgvx0= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -315,8 +317,13 @@ go.opentelemetry.io/contrib/propagators/b3 v1.10.0/go.mod h1:oxvamQ/mTDFQVugml/u go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM= go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4= go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.8.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY= go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 h1:TaB+1rQhddO1sF71MpZOZAuSPW1klK2M8XxfrBMfK7Y= go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.31.0 h1:H0+xwv4shKw0gfj/ZqR13qO2N/dBQogB1OcRjJjV39Y= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.31.0/go.mod h1:nkenGD8vcvs0uN6WhR90ZVHQlgDsRmXicnNadMnk+XQ= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.31.0 h1:BaQ2xM5cPmldVCMvbLoy5tcLUhXCtIhItDYBNw83B7Y= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.31.0/go.mod h1:VRr8tlXQEsTdesDCh0qBe2iKDWhpi3ZqDYw6VlZ8MhI= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 h1:pDDYmo0QadUPal5fwXoY1pmMpFcdyhXOmL5drCrI3vU= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0/go.mod h1:Krqnjl22jUJ0HgMzw5eveuCvFDXY4nSYb4F8t5gdrag= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 h1:KtiUEhQmj/Pa874bVYKGNVdq8NPKiacPbaRRtgXi+t4= @@ -327,12 +334,16 @@ go.opentelemetry.io/otel/exporters/zipkin v1.10.0 h1:HcPAFsFpEBKF+G5NIOA+gBsxifd go.opentelemetry.io/otel/exporters/zipkin v1.10.0/go.mod h1:HdfvgwcOoCB0+zzrTHycW6btjK0zNpkz2oTGO815SCI= go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs= go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A= +go.opentelemetry.io/otel/sdk v1.8.0/go.mod h1:uPSfc+yfDH2StDM/Rm35WE8gXSNdvCg023J6HeGNO0c= go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY= go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE= +go.opentelemetry.io/otel/sdk/metric v0.31.0 h1:2sZx4R43ZMhJdteKAlKoHvRgrMp53V1aRxvEf5lCq8Q= +go.opentelemetry.io/otel/sdk/metric v0.31.0/go.mod h1:fl0SmNnX9mN9xgU6OLYLMBMrNAsaZQi7qBwprwO3abk= go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4= go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E= go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.18.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= diff --git a/instrumentation/hypertrace/net/hyperhttp/handler.go b/instrumentation/hypertrace/net/hyperhttp/handler.go index de075712..2b95a4fe 100644 --- a/instrumentation/hypertrace/net/hyperhttp/handler.go +++ b/instrumentation/hypertrace/net/hyperhttp/handler.go @@ -16,7 +16,7 @@ func NewHandler(base http.Handler, operation string, opts ...Option) http.Handle } return otelhttp.NewHandler( - sdkhttp.WrapHandler(base, opentelemetry.SpanFromContext, o.toSDKOptions(), map[string]string{}), + sdkhttp.WrapHandler(base, operation, opentelemetry.SpanFromContext, o.toSDKOptions(), map[string]string{}), operation, ) } diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index 7dfcf079..1ede7654 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/trace" @@ -22,10 +23,12 @@ import ( config "github.com/hypertrace/agent-config/gen/go/v1" "go.opentelemetry.io/otel/attribute" + "github.com/go-logr/stdr" sdkconfig "github.com/hypertrace/goagent/sdk/config" "github.com/hypertrace/goagent/version" "go.opentelemetry.io/contrib/propagators/b3" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" otlpgrpc "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/exporters/zipkin" @@ -34,6 +37,17 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "google.golang.org/grpc/credentials" + //"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + //"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + //"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + // otelmetric "go.opentelemetry.io/otel/metric" + otelmetricglobal "go.opentelemetry.io/otel/metric/global" + controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" + processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" + // "go.opentelemetry.io/otel/sdk/metric" + // "go.opentelemetry.io/otel/sdk/metric/metricdata" + // "go.opentelemetry.io/otel/sdk/metric/view" ) var batchTimeout = time.Duration(200) * time.Millisecond @@ -167,6 +181,7 @@ func Init(cfg *config.AgentConfig) func() { // InitWithSpanProcessorWrapper initializes opentelemetry tracing with a wrapper over span processor // and returns a shutdown function to flush data immediately on a termination signal. func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper) func() { + stdr.SetVerbosity(5) mu.Lock() defer mu.Unlock() if initialized { @@ -219,6 +234,8 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor otel.SetTextMapPropagator(makePropagator(cfg.PropagationFormats)) + initMetrics() + traceProviders = make(map[string]*sdktrace.TracerProvider) globalSampler = sampler initialized = true @@ -348,3 +365,51 @@ func (sp *spanProcessorWithWrapper) Shutdown(ctx context.Context) error { func (sp *spanProcessorWithWrapper) ForceFlush(ctx context.Context) error { return sp.processor.ForceFlush(ctx) } + +func initMetrics() { + // stdout exporter + // exporter, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint()) + // if err != nil { + // log.Printf("error in init metrics: %v", fmt.Errorf("creating stdoutmetric exporter: %w", err)) + // //return nil, fmt.Errorf("creating stdoutmetric exporter: %w", err) + // return + // } + + // otlp exporter + opts := []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithEndpoint("localhost:4317"), + otlpmetricgrpc.WithInsecure(), + } + + exporter, err := otlpmetric.New( + context.Background(), + otlpmetricgrpc.NewClient(opts...), + ) + if err != nil { + log.Printf("error in init metrics: %v", fmt.Errorf("creating otlpmetric exporter: %w", err)) + return + } + + pusher := controller.New( + processor.NewFactory( + simple.NewWithInexpensiveDistribution(), + exporter, + ), + controller.WithExporter(exporter), + ) + if err := pusher.Start(context.Background()); err != nil { + log.Fatalf("starting push controller: %v", err) + } + + otelmetricglobal.SetMeterProvider(pusher) + + // metricsClient := + // defaultView, _ := view.New(view.MatchInstrumentName("*")) + + // meterProvider := metric.NewMeterProvider(metric.WithReader(metric.NewPeriodicReader(metricsClient, + // metric.WithAggregationSelector(metric.DefaultAggregationSelector), + // metric.WithTemporalitySelector(deltaTemporalitySelector), + // ), defaultView, defaultView)) + + // otelmetricglobal.SetMeterProvider(meterProvider) +} diff --git a/sdk/instrumentation/net/http/handler.go b/sdk/instrumentation/net/http/handler.go index dcb49ead..d449a9a8 100644 --- a/sdk/instrumentation/net/http/handler.go +++ b/sdk/instrumentation/net/http/handler.go @@ -12,14 +12,31 @@ import ( "github.com/hypertrace/goagent/sdk/filter" internalconfig "github.com/hypertrace/goagent/sdk/internal/config" "github.com/hypertrace/goagent/sdk/internal/container" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + semconv "go.opentelemetry.io/otel/semconv/v1.12.0" +) + +// Server HTTP metrics. +const ( + // Pseudo of go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp#RequestCount since a metric is not + // created for that one for some reason.(annotated with hypertrace to avoid a duplicate if otel go ever implement + // their own) + RequestCount = "hypertrace.http.server.request_count" // Incoming request count total ) type handler struct { delegate http.Handler + operation string defaultAttributes map[string]string spanFromContextRetriever sdk.SpanFromContext dataCaptureConfig *config.DataCapture filter filter.Filter + // Some metrics in here. + counters map[string]syncint64.Counter } // Options for HTTP handler instrumentation @@ -29,7 +46,7 @@ type Options struct { // WrapHandler wraps an uninstrumented handler (e.g. a handleFunc) and returns a new one // that should be used as base to an instrumented handler -func WrapHandler(delegate http.Handler, spanFromContext sdk.SpanFromContext, options *Options, spanAttributes map[string]string) http.Handler { +func WrapHandler(delegate http.Handler, operation string, spanFromContext sdk.SpanFromContext, options *Options, spanAttributes map[string]string) http.Handler { defaultAttributes := make(map[string]string) for k, v := range spanAttributes { defaultAttributes[k] = v @@ -41,10 +58,29 @@ func WrapHandler(delegate http.Handler, spanFromContext sdk.SpanFromContext, opt if options != nil && options.Filter != nil { f = options.Filter } - return &handler{delegate, defaultAttributes, spanFromContext, internalconfig.GetConfig().GetDataCapture(), f} + + mp := global.MeterProvider() + meter := mp.Meter("go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp", + metric.WithInstrumentationVersion(otelhttp.SemVersion())) + counters := make(map[string]syncint64.Counter) + + requestCountCounter, err := meter.SyncInt64().Counter(RequestCount) + if err != nil { + otel.Handle(err) + } + + counters[RequestCount] = requestCountCounter + + return &handler{delegate, operation, defaultAttributes, spanFromContext, internalconfig.GetConfig().GetDataCapture(), f, counters} } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Add metrics using the same logic in go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp#handler.go + ctx := r.Context() + labeler, _ := otelhttp.LabelerFromContext(ctx) + attributes := append(labeler.Get(), semconv.HTTPServerMetricAttributesFromHTTPRequest(h.operation, r)...) + h.counters[RequestCount].Add(ctx, 1, attributes...) + span := h.spanFromContextRetriever(r.Context()) if span.IsNoop() { diff --git a/sdk/instrumentation/net/http/handler_test.go b/sdk/instrumentation/net/http/handler_test.go index d58a0e6a..5ea3fda2 100644 --- a/sdk/instrumentation/net/http/handler_test.go +++ b/sdk/instrumentation/net/http/handler_test.go @@ -16,6 +16,8 @@ import ( "github.com/stretchr/testify/assert" ) +const fooOpName string = "/foo" + var emptyTestConfig = &config.DataCapture{ HttpHeaders: &config.Message{ Request: config.Bool(false), @@ -50,7 +52,7 @@ func TestServerRequestWithNilBodyIsntChanged(t *testing.T) { assert.Nil(t, r.Body) }) - wh, _ := WrapHandler(h, mock.SpanFromContext, &Options{}, map[string]string{}).(*handler) + wh, _ := WrapHandler(h, fooOpName, mock.SpanFromContext, &Options{}, map[string]string{}).(*handler) wh.dataCaptureConfig = emptyTestConfig ih := &mockHandler{baseHandler: wh} @@ -74,7 +76,7 @@ func TestServerRequestIsSuccessfullyTraced(t *testing.T) { rw.Write([]byte("ponse_body")) }) - wh, _ := WrapHandler(h, mock.SpanFromContext, &Options{}, map[string]string{"foo": "bar"}).(*handler) + wh, _ := WrapHandler(h, fooOpName, mock.SpanFromContext, &Options{}, map[string]string{"foo": "bar"}).(*handler) wh.dataCaptureConfig = emptyTestConfig ih := &mockHandler{baseHandler: wh} @@ -103,7 +105,7 @@ func TestHostIsSuccessfullyRecorded(t *testing.T) { assert.Nil(t, r.Body) }) - wh, _ := WrapHandler(h, mock.SpanFromContext, &Options{}, map[string]string{}).(*handler) + wh, _ := WrapHandler(h, fooOpName, mock.SpanFromContext, &Options{}, map[string]string{}).(*handler) wh.dataCaptureConfig = emptyTestConfig ih := &mockHandler{baseHandler: wh} @@ -139,7 +141,7 @@ func TestServerRequestHeadersAreSuccessfullyRecorded(t *testing.T) { rw.WriteHeader(202) }) - wh, _ := WrapHandler(h, mock.SpanFromContext, &Options{}, map[string]string{}).(*handler) + wh, _ := WrapHandler(h, fooOpName, mock.SpanFromContext, &Options{}, map[string]string{}).(*handler) ih := &mockHandler{baseHandler: wh} wh.dataCaptureConfig = emptyTestConfig wh.dataCaptureConfig.HttpHeaders = &config.Message{ @@ -285,7 +287,7 @@ func TestServerRecordsRequestAndResponseBodyAccordingly(t *testing.T) { rw.Write([]byte(tCase.responseBody)) }) - wh, _ := WrapHandler(h, mock.SpanFromContext, &Options{}, map[string]string{}).(*handler) + wh, _ := WrapHandler(h, fooOpName, mock.SpanFromContext, &Options{}, map[string]string{}).(*handler) wh.dataCaptureConfig = emptyTestConfig wh.dataCaptureConfig.HttpBody = &config.Message{ Request: config.Bool(tCase.captureHTTPBodyConfig), @@ -463,7 +465,7 @@ func TestServerRequestFilter(t *testing.T) { rw.WriteHeader(http.StatusOK) }) - wh, _ := WrapHandler(h, mock.SpanFromContext, tCase.options, map[string]string{}).(*handler) + wh, _ := WrapHandler(h, fooOpName, mock.SpanFromContext, tCase.options, map[string]string{}).(*handler) ih := &mockHandler{baseHandler: wh} r, _ := http.NewRequest("POST", tCase.url, strings.NewReader(tCase.body)) for i := 0; i < len(tCase.headerKeys); i++ { @@ -491,7 +493,7 @@ func TestProcessingBodyIsTrimmed(t *testing.T) { h := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {}) - wh, _ := WrapHandler(h, mock.SpanFromContext, &Options{ + wh, _ := WrapHandler(h, fooOpName, mock.SpanFromContext, &Options{ Filter: mock.Filter{ BodyEvaluator: func(span sdk.Span, body []byte, headers map[string][]string) result.FilterResult { assert.Equal(t, "{", string(body)) // body is truncated From ca7b03cdec6a5689a12dbeacd1da371760aa8d70 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Tue, 24 Jan 2023 13:07:38 -0800 Subject: [PATCH 02/13] cleanup metrics init --- go.mod | 3 +- go.sum | 2 + .../github.com/gin-gonic/hypergin/gin.go | 7 +- .../github.com/gorilla/hypermux/mux.go | 3 +- instrumentation/opentelemetry/init.go | 139 ++++++++++-------- .../opentelemetry/net/hyperhttp/handler.go | 3 +- 6 files changed, 89 insertions(+), 68 deletions(-) diff --git a/go.mod b/go.mod index e4333f9c..594ab06c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.17 require ( contrib.go.opencensus.io/exporter/zipkin v0.1.2 github.com/gin-gonic/gin v1.7.2 - github.com/go-logr/stdr v1.2.2 + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.1.2 github.com/gorilla/mux v1.8.0 @@ -35,6 +35,7 @@ require ( require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.31.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.31.0 go.opentelemetry.io/otel/sdk/metric v0.31.0 ) diff --git a/go.sum b/go.sum index 22db73a0..4b5f5811 100644 --- a/go.sum +++ b/go.sum @@ -328,6 +328,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 h1:pDDYmo0QadUPal5fwXo go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0/go.mod h1:Krqnjl22jUJ0HgMzw5eveuCvFDXY4nSYb4F8t5gdrag= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 h1:KtiUEhQmj/Pa874bVYKGNVdq8NPKiacPbaRRtgXi+t4= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0/go.mod h1:OfUCyyIiDvNXHWpcWgbF+MWvqPZiNa3YDEnivcnYsV0= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.31.0 h1:fu/wxbXqjgIRZYzQNrF175qtwrJx+oQSFhZpTIbNQLc= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.31.0/go.mod h1:a80IJcYgCLVXJurhoyPjMBiNI5gPrWXLBTAwOp8N6Vw= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.10.0 h1:c9UtMu/qnbLlVwTwt+ABrURrioEruapIslTDYZHJe2w= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.10.0/go.mod h1:h3Lrh9t3Dnqp3NPwAZx7i37UFX7xrfnO1D+fuClREOA= go.opentelemetry.io/otel/exporters/zipkin v1.10.0 h1:HcPAFsFpEBKF+G5NIOA+gBsxifd3Ej+wb+KsdBLa15E= diff --git a/instrumentation/opentelemetry/github.com/gin-gonic/hypergin/gin.go b/instrumentation/opentelemetry/github.com/gin-gonic/hypergin/gin.go index 50baa576..438cf657 100644 --- a/instrumentation/opentelemetry/github.com/gin-gonic/hypergin/gin.go +++ b/instrumentation/opentelemetry/github.com/gin-gonic/hypergin/gin.go @@ -83,15 +83,16 @@ func spanNameFormatter(operation string, r *http.Request) (spanName string) { func Middleware(options *sdkhttp.Options) gin.HandlerFunc { return wrap(func(delegate http.Handler) http.Handler { wrappedHandler, ok := delegate.(*nextRequestHandler) - + ginOperationName := "" // if we fail to extract the next request handler from delegate the route template won't be reported if ok { + ginOperationName := wrappedHandler.c.FullPath() rc := wrappedHandler.c.Request.Context() - ctx := context.WithValue(rc, hyperGinKey, ginRoute{route: wrappedHandler.c.FullPath()}) + ctx := context.WithValue(rc, hyperGinKey, ginRoute{route: ginOperationName}) wrappedHandler.c.Request = wrappedHandler.c.Request.WithContext(ctx) } return otelhttp.NewHandler( - sdkhttp.WrapHandler(delegate, opentelemetry.SpanFromContext, options, map[string]string{}), + sdkhttp.WrapHandler(delegate, ginOperationName, opentelemetry.SpanFromContext, options, map[string]string{}), "", otelhttp.WithSpanNameFormatter(spanNameFormatter), ) diff --git a/instrumentation/opentelemetry/github.com/gorilla/hypermux/mux.go b/instrumentation/opentelemetry/github.com/gorilla/hypermux/mux.go index d917e1c6..e27c7725 100644 --- a/instrumentation/opentelemetry/github.com/gorilla/hypermux/mux.go +++ b/instrumentation/opentelemetry/github.com/gorilla/hypermux/mux.go @@ -30,9 +30,10 @@ func spanNameFormatter(operation string, r *http.Request) (spanName string) { // NewMiddleware sets up a handler to start tracing the incoming requests. func NewMiddleware(options *sdkhttp.Options) mux.MiddlewareFunc { + // TODO: Get a proper operation name for http gorilla mux return func(delegate http.Handler) http.Handler { return otelhttp.NewHandler( - sdkhttp.WrapHandler(delegate, opentelemetry.SpanFromContext, options, map[string]string{}), + sdkhttp.WrapHandler(delegate, "", opentelemetry.SpanFromContext, options, map[string]string{}), "", otelhttp.WithSpanNameFormatter(spanNameFormatter), ) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index 1ede7654..1526d389 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -23,31 +23,25 @@ import ( config "github.com/hypertrace/agent-config/gen/go/v1" "go.opentelemetry.io/otel/attribute" - "github.com/go-logr/stdr" sdkconfig "github.com/hypertrace/goagent/sdk/config" "github.com/hypertrace/goagent/version" "go.opentelemetry.io/contrib/propagators/b3" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" otlpgrpc "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/exporters/zipkin" + otelmetricglobal "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/propagation" + controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" + sdkmetricexport "go.opentelemetry.io/otel/sdk/metric/export" + processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "google.golang.org/grpc/credentials" - //"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - //"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - //"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" - // otelmetric "go.opentelemetry.io/otel/metric" - otelmetricglobal "go.opentelemetry.io/otel/metric/global" - controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" - processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" - // "go.opentelemetry.io/otel/sdk/metric" - // "go.opentelemetry.io/otel/sdk/metric/metricdata" - // "go.opentelemetry.io/otel/sdk/metric/view" ) var batchTimeout = time.Duration(200) * time.Millisecond @@ -89,6 +83,53 @@ func removeProtocolPrefixForOTLP(endpoint string) string { return pieces[1] } +func makeMetricsExporterFactory(cfg *config.AgentConfig) func() (sdkmetricexport.Exporter, error) { + // We are only supporting logging and otlp metric exporters for now. We will add support for prometheus + // metrics later + switch cfg.Reporting.MetricReporterType { + case config.MetricReporterType_METRIC_REPORTER_TYPE_LOGGING: + // stdout exporter + return func() (sdkmetricexport.Exporter, error) { + // TODO: Define if endpoint could be a filepath to write into a file. + return stdoutmetric.New(stdoutmetric.WithPrettyPrint()) + } + default: + endpoint := cfg.GetReporting().GetMetricEndpoint().GetValue() + if len(endpoint) == 0 { + endpoint = cfg.GetReporting().GetEndpoint().GetValue() + } + + opts := []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithEndpoint(removeProtocolPrefixForOTLP(endpoint)), + } + + if !cfg.GetReporting().GetSecure().GetValue() { + opts = append(opts, otlpmetricgrpc.WithInsecure()) + } + + certFile := cfg.GetReporting().GetCertFile().GetValue() + if len(certFile) > 0 { + if tlsCredentials, err := credentials.NewClientTLSFromFile(certFile, ""); err == nil { + opts = append(opts, otlpmetricgrpc.WithTLSCredentials(tlsCredentials)) + } else { + log.Printf("error while creating tls credentials from cert path %s: %v", certFile, err) + } + } + + if cfg.Reporting.GetEnableGrpcLoadbalancing().GetValue() { + resolver.SetDefaultScheme("dns") + opts = append(opts, otlpmetricgrpc.WithServiceConfig(`{"loadBalancingConfig": [ { "round_robin": {} } ]}`)) + } + + return func() (sdkmetricexport.Exporter, error) { + return otlpmetric.New( + context.Background(), + otlpmetricgrpc.NewClient(opts...), + ) + } + } +} + func makeExporterFactory(cfg *config.AgentConfig) func() (sdktrace.SpanExporter, error) { switch cfg.Reporting.TraceReporterType { case config.TraceReporterType_ZIPKIN: @@ -181,7 +222,6 @@ func Init(cfg *config.AgentConfig) func() { // InitWithSpanProcessorWrapper initializes opentelemetry tracing with a wrapper over span processor // and returns a shutdown function to flush data immediately on a termination signal. func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper) func() { - stdr.SetVerbosity(5) mu.Lock() defer mu.Unlock() if initialized { @@ -234,7 +274,28 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor otel.SetTextMapPropagator(makePropagator(cfg.PropagationFormats)) - initMetrics() + // Initialize metrics + metricsExporterFactory := makeMetricsExporterFactory(cfg) + metricsExporter, err := metricsExporterFactory() + if err != nil { + log.Fatal(err) + } + + metricsPusher := controller.New( + processor.NewFactory( + simple.NewWithInexpensiveDistribution(), + metricsExporter, + ), + controller.WithExporter(metricsExporter), + controller.WithResource(resources), + ) + if err := metricsPusher.Start(context.Background()); err != nil { + log.Fatalf("starting metrics push controller: %v", err) + } + + otelmetricglobal.SetMeterProvider(metricsPusher) + + //initMetrics() traceProviders = make(map[string]*sdktrace.TracerProvider) globalSampler = sampler @@ -266,6 +327,8 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor if err != nil { log.Printf("error while shutting down default tracer provider: %v\n", err) } + + metricsPusher.Stop(context.Background()) initialized = false enabled = false sdkconfig.ResetConfig() @@ -365,51 +428,3 @@ func (sp *spanProcessorWithWrapper) Shutdown(ctx context.Context) error { func (sp *spanProcessorWithWrapper) ForceFlush(ctx context.Context) error { return sp.processor.ForceFlush(ctx) } - -func initMetrics() { - // stdout exporter - // exporter, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint()) - // if err != nil { - // log.Printf("error in init metrics: %v", fmt.Errorf("creating stdoutmetric exporter: %w", err)) - // //return nil, fmt.Errorf("creating stdoutmetric exporter: %w", err) - // return - // } - - // otlp exporter - opts := []otlpmetricgrpc.Option{ - otlpmetricgrpc.WithEndpoint("localhost:4317"), - otlpmetricgrpc.WithInsecure(), - } - - exporter, err := otlpmetric.New( - context.Background(), - otlpmetricgrpc.NewClient(opts...), - ) - if err != nil { - log.Printf("error in init metrics: %v", fmt.Errorf("creating otlpmetric exporter: %w", err)) - return - } - - pusher := controller.New( - processor.NewFactory( - simple.NewWithInexpensiveDistribution(), - exporter, - ), - controller.WithExporter(exporter), - ) - if err := pusher.Start(context.Background()); err != nil { - log.Fatalf("starting push controller: %v", err) - } - - otelmetricglobal.SetMeterProvider(pusher) - - // metricsClient := - // defaultView, _ := view.New(view.MatchInstrumentName("*")) - - // meterProvider := metric.NewMeterProvider(metric.WithReader(metric.NewPeriodicReader(metricsClient, - // metric.WithAggregationSelector(metric.DefaultAggregationSelector), - // metric.WithTemporalitySelector(deltaTemporalitySelector), - // ), defaultView, defaultView)) - - // otelmetricglobal.SetMeterProvider(meterProvider) -} diff --git a/instrumentation/opentelemetry/net/hyperhttp/handler.go b/instrumentation/opentelemetry/net/hyperhttp/handler.go index 3704faee..c241adda 100644 --- a/instrumentation/opentelemetry/net/hyperhttp/handler.go +++ b/instrumentation/opentelemetry/net/hyperhttp/handler.go @@ -10,5 +10,6 @@ import ( // WrapHandler returns a new round tripper instrumented that relies on the // needs to be used with OTel instrumentation. func WrapHandler(delegate http.Handler, options *sdkhttp.Options) http.Handler { - return sdkhttp.WrapHandler(delegate, opentelemetry.SpanFromContext, options, map[string]string{}) + // TODO: Find another way to get the operation name + return sdkhttp.WrapHandler(delegate, "", opentelemetry.SpanFromContext, options, map[string]string{}) } From 500d088e7ceb71bf380160d4a6fc7678ca7930d5 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Tue, 24 Jan 2023 13:40:17 -0800 Subject: [PATCH 03/13] fix lint check fail in open census libs --- instrumentation/opencensus/net/hyperhttp/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opencensus/net/hyperhttp/handler.go b/instrumentation/opencensus/net/hyperhttp/handler.go index f60b9252..e212c996 100644 --- a/instrumentation/opencensus/net/hyperhttp/handler.go +++ b/instrumentation/opencensus/net/hyperhttp/handler.go @@ -10,5 +10,5 @@ import ( // WrapHandler returns a new http.Handler that should be passed to // the *ochttp.Handler func WrapHandler(delegate http.Handler, options *sdkhttp.Options) http.Handler { - return sdkhttp.WrapHandler(delegate, opencensus.SpanFromContext, options, map[string]string{}) + return sdkhttp.WrapHandler(delegate, "", opencensus.SpanFromContext, options, map[string]string{}) } From 9c8b2cae96399a6e4445d3838d4c229af51c1085 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Wed, 25 Jan 2023 07:09:36 -0800 Subject: [PATCH 04/13] update golangci lint gha to v0.3.4 --- .github/workflows/pr-test.yml | 2 +- instrumentation/opencensus/net/hyperhttp/handler.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pr-test.yml b/.github/workflows/pr-test.yml index c95c778d..80fd10b9 100644 --- a/.github/workflows/pr-test.yml +++ b/.github/workflows/pr-test.yml @@ -37,7 +37,7 @@ jobs: make deps - name: Lint files - uses: golangci/golangci-lint-action@v3.3.1 + uses: golangci/golangci-lint-action@v3.4.0 with: # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. version: v1.50.1 diff --git a/instrumentation/opencensus/net/hyperhttp/handler.go b/instrumentation/opencensus/net/hyperhttp/handler.go index e212c996..fb7428e6 100644 --- a/instrumentation/opencensus/net/hyperhttp/handler.go +++ b/instrumentation/opencensus/net/hyperhttp/handler.go @@ -10,5 +10,6 @@ import ( // WrapHandler returns a new http.Handler that should be passed to // the *ochttp.Handler func WrapHandler(delegate http.Handler, options *sdkhttp.Options) http.Handler { + // TODO: If I am doing this then I might have the metrics code in the wrong place. return sdkhttp.WrapHandler(delegate, "", opencensus.SpanFromContext, options, map[string]string{}) } From 48c419bbac44da732f9fbf64dd481dce1e873093 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Wed, 25 Jan 2023 07:49:06 -0800 Subject: [PATCH 05/13] fix failing unit test --- instrumentation/opentelemetry/init_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry/init_test.go b/instrumentation/opentelemetry/init_test.go index 1be88263..bb9bc84b 100644 --- a/instrumentation/opentelemetry/init_test.go +++ b/instrumentation/opentelemetry/init_test.go @@ -178,9 +178,10 @@ func TestMultipleTraceProviders(t *testing.T) { assert.Equal(t, 0, count) }) - t.Run("test 2 requests after flush", func(t *testing.T) { + // 2 requests for spans and 1 for metrics. + t.Run("test 3 requests after flush", func(t *testing.T) { shutdown() - assert.Equal(t, 2, count) + assert.Equal(t, 3, count) assert.Equal(t, 0, len(traceProviders)) }) } From 2c36f66636225cc173d59666481d034d135ed784 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Wed, 25 Jan 2023 12:12:56 -0800 Subject: [PATCH 06/13] add service instance id to metrics resource --- instrumentation/opentelemetry/init.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index 1526d389..1534f0d8 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -23,6 +23,7 @@ import ( config "github.com/hypertrace/agent-config/gen/go/v1" "go.opentelemetry.io/otel/attribute" + "github.com/hypertrace/goagent/instrumentation/opentelemetry/internal/identifier" sdkconfig "github.com/hypertrace/goagent/sdk/config" "github.com/hypertrace/goagent/version" "go.opentelemetry.io/contrib/propagators/b3" @@ -281,13 +282,19 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor log.Fatal(err) } + resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes) + resourceKvps = append(resourceKvps, identifier.ServiceInstanceKeyValue) + metricResources, err := resource.New(context.Background(), resource.WithAttributes(resourceKvps...)) + if err != nil { + log.Fatal(err) + } metricsPusher := controller.New( processor.NewFactory( simple.NewWithInexpensiveDistribution(), metricsExporter, ), controller.WithExporter(metricsExporter), - controller.WithResource(resources), + controller.WithResource(metricResources), ) if err := metricsPusher.Start(context.Background()); err != nil { log.Fatalf("starting metrics push controller: %v", err) @@ -295,8 +302,6 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor otelmetricglobal.SetMeterProvider(metricsPusher) - //initMetrics() - traceProviders = make(map[string]*sdktrace.TracerProvider) globalSampler = sampler initialized = true From 9bf078d48a021be28515e7aa0ea22f5e46790af1 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Thu, 26 Jan 2023 12:34:14 -0800 Subject: [PATCH 07/13] add metrics enabled config --- config/defaults.go | 1 + go.mod | 2 +- go.sum | 2 + instrumentation/opentelemetry/init.go | 64 ++++++++++++++++----------- 4 files changed, 42 insertions(+), 27 deletions(-) diff --git a/config/defaults.go b/config/defaults.go index 0ce911a0..bf30c521 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -40,5 +40,6 @@ var defaultConfig = agentconfig.AgentConfig{ }, Telemetry: &agentconfig.Telemetry{ StartupSpanEnabled: agentconfig.Bool(true), + MetricsEnabled: agentconfig.Bool(true), }, } diff --git a/go.mod b/go.mod index 594ab06c..34abf540 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.1.2 github.com/gorilla/mux v1.8.0 - github.com/hypertrace/agent-config/gen/go v0.0.0-20221206162312-4a295cabd009 + github.com/hypertrace/agent-config/gen/go v0.0.0-20230126155022-544bf33b6648 github.com/json-iterator/go v1.1.11 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/mattn/go-sqlite3 v1.14.4 diff --git a/go.sum b/go.sum index 4b5f5811..d8068a44 100644 --- a/go.sum +++ b/go.sum @@ -205,6 +205,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hypertrace/agent-config/gen/go v0.0.0-20221206162312-4a295cabd009 h1:a2Y5RfRQC3Fomz+HF3FdqdvNNMCtV/lc/N1Xuf5+au0= github.com/hypertrace/agent-config/gen/go v0.0.0-20221206162312-4a295cabd009/go.mod h1:WRbKE44DNsSbRnHja1VpU+dUSrTIuduePGhZ+bXmvDw= +github.com/hypertrace/agent-config/gen/go v0.0.0-20230126155022-544bf33b6648 h1:ez9iewAbKBCXtVKdG99xtzG7GPCbEJWXP8FUlCIJsSY= +github.com/hypertrace/agent-config/gen/go v0.0.0-20230126155022-544bf33b6648/go.mod h1:WRbKE44DNsSbRnHja1VpU+dUSrTIuduePGhZ+bXmvDw= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index 1534f0d8..eeddebdf 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -276,31 +276,7 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor otel.SetTextMapPropagator(makePropagator(cfg.PropagationFormats)) // Initialize metrics - metricsExporterFactory := makeMetricsExporterFactory(cfg) - metricsExporter, err := metricsExporterFactory() - if err != nil { - log.Fatal(err) - } - - resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes) - resourceKvps = append(resourceKvps, identifier.ServiceInstanceKeyValue) - metricResources, err := resource.New(context.Background(), resource.WithAttributes(resourceKvps...)) - if err != nil { - log.Fatal(err) - } - metricsPusher := controller.New( - processor.NewFactory( - simple.NewWithInexpensiveDistribution(), - metricsExporter, - ), - controller.WithExporter(metricsExporter), - controller.WithResource(metricResources), - ) - if err := metricsPusher.Start(context.Background()); err != nil { - log.Fatalf("starting metrics push controller: %v", err) - } - - otelmetricglobal.SetMeterProvider(metricsPusher) + metricsShutdownFn := initializeMetrics(cfg) traceProviders = make(map[string]*sdktrace.TracerProvider) globalSampler = sampler @@ -333,7 +309,7 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor log.Printf("error while shutting down default tracer provider: %v\n", err) } - metricsPusher.Stop(context.Background()) + metricsShutdownFn() initialized = false enabled = false sdkconfig.ResetConfig() @@ -406,6 +382,42 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu }), tp, nil } +func initializeMetrics(cfg *config.AgentConfig) func() { + if cfg.GetTelemetry() == nil || !cfg.GetTelemetry().GetMetricsEnabled().GetValue() { + return func() {} + } + + metricsExporterFactory := makeMetricsExporterFactory(cfg) + metricsExporter, err := metricsExporterFactory() + if err != nil { + log.Fatal(err) + } + + resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes) + resourceKvps = append(resourceKvps, identifier.ServiceInstanceKeyValue) + metricResources, err := resource.New(context.Background(), resource.WithAttributes(resourceKvps...)) + if err != nil { + log.Fatal(err) + } + metricsPusher := controller.New( + processor.NewFactory( + simple.NewWithInexpensiveDistribution(), + metricsExporter, + ), + controller.WithExporter(metricsExporter), + controller.WithResource(metricResources), + ) + if err := metricsPusher.Start(context.Background()); err != nil { + log.Fatalf("starting metrics push controller: %v", err) + } + + otelmetricglobal.SetMeterProvider(metricsPusher) + + return func() { + metricsPusher.Stop(context.Background()) + } +} + // SpanProcessorWrapper wraps otel span processor // and is responsible to delegate calls to the wrapped processor type SpanProcessorWrapper interface { From 1cebe3506a2fc1fcd1cc12faf9bc6dc53255eb76 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Thu, 26 Jan 2023 13:09:36 -0800 Subject: [PATCH 08/13] use main branch for agent-config --- go.mod | 2 +- go.sum | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 34abf540..26feaa70 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.1.2 github.com/gorilla/mux v1.8.0 - github.com/hypertrace/agent-config/gen/go v0.0.0-20230126155022-544bf33b6648 + github.com/hypertrace/agent-config/gen/go v0.0.0-20230126205246-bd4d81e696a6 github.com/json-iterator/go v1.1.11 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/mattn/go-sqlite3 v1.14.4 diff --git a/go.sum b/go.sum index d8068a44..61a50b14 100644 --- a/go.sum +++ b/go.sum @@ -203,10 +203,8 @@ github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/hypertrace/agent-config/gen/go v0.0.0-20221206162312-4a295cabd009 h1:a2Y5RfRQC3Fomz+HF3FdqdvNNMCtV/lc/N1Xuf5+au0= -github.com/hypertrace/agent-config/gen/go v0.0.0-20221206162312-4a295cabd009/go.mod h1:WRbKE44DNsSbRnHja1VpU+dUSrTIuduePGhZ+bXmvDw= -github.com/hypertrace/agent-config/gen/go v0.0.0-20230126155022-544bf33b6648 h1:ez9iewAbKBCXtVKdG99xtzG7GPCbEJWXP8FUlCIJsSY= -github.com/hypertrace/agent-config/gen/go v0.0.0-20230126155022-544bf33b6648/go.mod h1:WRbKE44DNsSbRnHja1VpU+dUSrTIuduePGhZ+bXmvDw= +github.com/hypertrace/agent-config/gen/go v0.0.0-20230126205246-bd4d81e696a6 h1:MuiFiuigCk2NwMM5HOvI7FJUTZEsGeqA25c4acBjdEs= +github.com/hypertrace/agent-config/gen/go v0.0.0-20230126205246-bd4d81e696a6/go.mod h1:WRbKE44DNsSbRnHja1VpU+dUSrTIuduePGhZ+bXmvDw= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= From 8fa8a8511f38b20e760dafe05af8af5e32e5be7f Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Fri, 27 Jan 2023 09:50:50 -0800 Subject: [PATCH 09/13] feat: use a modified batch span processor that creates metrics This is to track spans received and dropped by the processor. --- .../batchspanprocessor/README.md | 9 + .../batch_span_processor.go.original | 432 +++++++++++++++++ .../batch_span_processor.modified.go | 434 ++++++++++++++++++ .../batchspanprocessor/bspcreator.go | 14 + .../batchspanprocessor/bspcreator_test.go | 25 + .../batchspanprocessor/env.go.original | 177 +++++++ .../batchspanprocessor/env.modified.go | 176 +++++++ .../examples/init_additional_test.go | 7 +- instrumentation/opentelemetry/init.go | 11 +- .../opentelemetry/init_additional.go | 17 +- 10 files changed, 1291 insertions(+), 11 deletions(-) create mode 100644 instrumentation/opentelemetry/batchspanprocessor/README.md create mode 100644 instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.go.original create mode 100644 instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go create mode 100644 instrumentation/opentelemetry/batchspanprocessor/bspcreator.go create mode 100644 instrumentation/opentelemetry/batchspanprocessor/bspcreator_test.go create mode 100644 instrumentation/opentelemetry/batchspanprocessor/env.go.original create mode 100644 instrumentation/opentelemetry/batchspanprocessor/env.modified.go diff --git a/instrumentation/opentelemetry/batchspanprocessor/README.md b/instrumentation/opentelemetry/batchspanprocessor/README.md new file mode 100644 index 00000000..ef90a1de --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/README.md @@ -0,0 +1,9 @@ +# Modified Batch Span Processor + +Since the original BatchSpanProcessor does not send metrics for spans received and spans dropped, the modified BatchSpanProcessor creates and populates those metrics so the user can track whether there are spans being dropped. + +We have kept track of the original files modified so it's easier to figure out the changes we added. When upgrading, copy the newer files into their go.original counterparts and then do a diff with the modified.go files to figure out what changes to make in order to upgrade the modified files. + +The paths of the files modified: +- [sdk/trace/batch_span_processor.go](https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/trace/batch_span_processor.go) +- [sdk/internal/env/env.go](https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/internal/env/env.go) diff --git a/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.go.original b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.go.original new file mode 100644 index 00000000..a2d7db49 --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.go.original @@ -0,0 +1,432 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace // import "go.opentelemetry.io/otel/sdk/trace" + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/internal/env" + "go.opentelemetry.io/otel/trace" +) + +// Defaults for BatchSpanProcessorOptions. +const ( + DefaultMaxQueueSize = 2048 + DefaultScheduleDelay = 5000 + DefaultExportTimeout = 30000 + DefaultMaxExportBatchSize = 512 +) + +// BatchSpanProcessorOption configures a BatchSpanProcessor. +type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) + +// BatchSpanProcessorOptions is configuration settings for a +// BatchSpanProcessor. +type BatchSpanProcessorOptions struct { + // MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the + // queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior. + // The default value of MaxQueueSize is 2048. + MaxQueueSize int + + // BatchTimeout is the maximum duration for constructing a batch. Processor + // forcefully sends available spans when timeout is reached. + // The default value of BatchTimeout is 5000 msec. + BatchTimeout time.Duration + + // ExportTimeout specifies the maximum duration for exporting spans. If the timeout + // is reached, the export will be cancelled. + // The default value of ExportTimeout is 30000 msec. + ExportTimeout time.Duration + + // MaxExportBatchSize is the maximum number of spans to process in a single batch. + // If there are more than one batch worth of spans then it processes multiple batches + // of spans one batch after the other without any delay. + // The default value of MaxExportBatchSize is 512. + MaxExportBatchSize int + + // BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full + // AND if BlockOnQueueFull is set to true. + // Blocking option should be used carefully as it can severely affect the performance of an + // application. + BlockOnQueueFull bool +} + +// batchSpanProcessor is a SpanProcessor that batches asynchronously-received +// spans and sends them to a trace.Exporter when complete. +type batchSpanProcessor struct { + e SpanExporter + o BatchSpanProcessorOptions + + queue chan ReadOnlySpan + dropped uint32 + + batch []ReadOnlySpan + batchMutex sync.Mutex + timer *time.Timer + stopWait sync.WaitGroup + stopOnce sync.Once + stopCh chan struct{} +} + +var _ SpanProcessor = (*batchSpanProcessor)(nil) + +// NewBatchSpanProcessor creates a new SpanProcessor that will send completed +// span batches to the exporter with the supplied options. +// +// If the exporter is nil, the span processor will preform no action. +func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { + maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize) + maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize) + + if maxExportBatchSize > maxQueueSize { + if DefaultMaxExportBatchSize > maxQueueSize { + maxExportBatchSize = maxQueueSize + } else { + maxExportBatchSize = DefaultMaxExportBatchSize + } + } + + o := BatchSpanProcessorOptions{ + BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond, + ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond, + MaxQueueSize: maxQueueSize, + MaxExportBatchSize: maxExportBatchSize, + } + for _, opt := range options { + opt(&o) + } + bsp := &batchSpanProcessor{ + e: exporter, + o: o, + batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize), + timer: time.NewTimer(o.BatchTimeout), + queue: make(chan ReadOnlySpan, o.MaxQueueSize), + stopCh: make(chan struct{}), + } + + bsp.stopWait.Add(1) + go func() { + defer bsp.stopWait.Done() + bsp.processQueue() + bsp.drainQueue() + }() + + return bsp +} + +// OnStart method does nothing. +func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {} + +// OnEnd method enqueues a ReadOnlySpan for later processing. +func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) { + // Do not enqueue spans if we are just going to drop them. + if bsp.e == nil { + return + } + bsp.enqueue(s) +} + +// Shutdown flushes the queue and waits until all spans are processed. +// It only executes once. Subsequent call does nothing. +func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { + var err error + bsp.stopOnce.Do(func() { + wait := make(chan struct{}) + go func() { + close(bsp.stopCh) + bsp.stopWait.Wait() + if bsp.e != nil { + if err := bsp.e.Shutdown(ctx); err != nil { + otel.Handle(err) + } + } + close(wait) + }() + // Wait until the wait group is done or the context is cancelled + select { + case <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + }) + return err +} + +type forceFlushSpan struct { + ReadOnlySpan + flushed chan struct{} +} + +func (f forceFlushSpan) SpanContext() trace.SpanContext { + return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) +} + +// ForceFlush exports all ended spans that have not yet been exported. +func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { + var err error + if bsp.e != nil { + flushCh := make(chan struct{}) + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { + select { + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called + case <-ctx.Done(): + return ctx.Err() + } + } + + wait := make(chan error) + go func() { + wait <- bsp.exportSpans(ctx) + close(wait) + }() + // Wait until the export is finished or the context is cancelled/timed out + select { + case err = <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + } + return err +} + +// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the +// maximum queue size allowed for a BatchSpanProcessor. +func WithMaxQueueSize(size int) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.MaxQueueSize = size + } +} + +// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures +// the maximum export batch size allowed for a BatchSpanProcessor. +func WithMaxExportBatchSize(size int) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.MaxExportBatchSize = size + } +} + +// WithBatchTimeout returns a BatchSpanProcessorOption that configures the +// maximum delay allowed for a BatchSpanProcessor before it will export any +// held span (whether the queue is full or not). +func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.BatchTimeout = delay + } +} + +// WithExportTimeout returns a BatchSpanProcessorOption that configures the +// amount of time a BatchSpanProcessor waits for an exporter to export before +// abandoning the export. +func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.ExportTimeout = timeout + } +} + +// WithBlocking returns a BatchSpanProcessorOption that configures a +// BatchSpanProcessor to wait for enqueue operations to succeed instead of +// dropping data when the queue is full. +func WithBlocking() BatchSpanProcessorOption { + return func(o *BatchSpanProcessorOptions) { + o.BlockOnQueueFull = true + } +} + +// exportSpans is a subroutine of processing and draining the queue. +func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { + bsp.timer.Reset(bsp.o.BatchTimeout) + + bsp.batchMutex.Lock() + defer bsp.batchMutex.Unlock() + + if bsp.o.ExportTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) + defer cancel() + } + + if l := len(bsp.batch); l > 0 { + global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + err := bsp.e.ExportSpans(ctx, bsp.batch) + + // A new batch is always created after exporting, even if the batch failed to be exported. + // + // It is up to the exporter to implement any type of retry logic if a batch is failing + // to be exported, since it is specific to the protocol and backend being sent to. + bsp.batch = bsp.batch[:0] + + if err != nil { + return err + } + } + return nil +} + +// processQueue removes spans from the `queue` channel until processor +// is shut down. It calls the exporter in batches of up to MaxExportBatchSize +// waiting up to BatchTimeout to form a batch. +func (bsp *batchSpanProcessor) processQueue() { + defer bsp.timer.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case <-bsp.stopCh: + return + case <-bsp.timer.C: + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + case sd := <-bsp.queue: + if ffs, ok := sd.(forceFlushSpan); ok { + close(ffs.flushed) + continue + } + bsp.batchMutex.Lock() + bsp.batch = append(bsp.batch, sd) + shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize + bsp.batchMutex.Unlock() + if shouldExport { + if !bsp.timer.Stop() { + <-bsp.timer.C + } + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + } + } + } +} + +// drainQueue awaits the any caller that had added to bsp.stopWait +// to finish the enqueue, then exports the final batch. +func (bsp *batchSpanProcessor) drainQueue() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case sd := <-bsp.queue: + if sd == nil { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + return + } + + bsp.batchMutex.Lock() + bsp.batch = append(bsp.batch, sd) + shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize + bsp.batchMutex.Unlock() + + if shouldExport { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + } + default: + close(bsp.queue) + } + } +} + +func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { + ctx := context.TODO() + if bsp.o.BlockOnQueueFull { + bsp.enqueueBlockOnQueueFull(ctx, sd) + } else { + bsp.enqueueDrop(ctx, sd) + } +} + +func recoverSendOnClosedChan() { + x := recover() + switch err := x.(type) { + case nil: + return + case runtime.Error: + if err.Error() == "send on closed channel" { + return + } + } + panic(x) +} + +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool { + if !sd.SpanContext().IsSampled() { + return false + } + + // This ensures the bsp.queue<- below does not panic as the + // processor shuts down. + defer recoverSendOnClosedChan() + + select { + case <-bsp.stopCh: + return false + default: + } + + select { + case bsp.queue <- sd: + return true + case <-ctx.Done(): + return false + } +} + +func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool { + if !sd.SpanContext().IsSampled() { + return false + } + + // This ensures the bsp.queue<- below does not panic as the + // processor shuts down. + defer recoverSendOnClosedChan() + + select { + case <-bsp.stopCh: + return false + default: + } + + select { + case bsp.queue <- sd: + return true + default: + atomic.AddUint32(&bsp.dropped, 1) + } + return false +} + +// MarshalLog is the marshaling function used by the logging system to represent this exporter. +func (bsp *batchSpanProcessor) MarshalLog() interface{} { + return struct { + Type string + SpanExporter SpanExporter + Config BatchSpanProcessorOptions + }{ + Type: "BatchSpanProcessor", + SpanExporter: bsp.e, + Config: bsp.o, + } +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go new file mode 100644 index 00000000..b5a908b7 --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go @@ -0,0 +1,434 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchspanprocessor // import "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + metricglobal "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" +) + +// Defaults for BatchSpanProcessorOptions. +const ( + DefaultMaxQueueSize = 2048 + DefaultScheduleDelay = 5000 + DefaultExportTimeout = 30000 + DefaultMaxExportBatchSize = 512 + SpansReceivedCounter = "hypertrace.bsp.spans_received" + SpansDroppedCounter = "hypertrace.bsp.spans_dropped" +) + +// batchSpanProcessor is a SpanProcessor that batches asynchronously-received +// spans and sends them to a trace.Exporter when complete. +type batchSpanProcessor struct { + e sdktrace.SpanExporter + o sdktrace.BatchSpanProcessorOptions + + queue chan sdktrace.ReadOnlySpan + dropped uint32 + + batch []sdktrace.ReadOnlySpan + batchMutex sync.Mutex + timer *time.Timer + stopWait sync.WaitGroup + stopOnce sync.Once + stopCh chan struct{} + // Some metrics in here. + counters map[string]syncint64.Counter +} + +var _ sdktrace.SpanProcessor = (*batchSpanProcessor)(nil) + +// NewBatchSpanProcessor creates a new SpanProcessor that will send completed +// span batches to the exporter with the supplied options. +// +// If the exporter is nil, the span processor will preform no action. +func NewBatchSpanProcessor(exporter sdktrace.SpanExporter, options ...sdktrace.BatchSpanProcessorOption) sdktrace.SpanProcessor { + maxQueueSize := BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize) + maxExportBatchSize := BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize) + + if maxExportBatchSize > maxQueueSize { + if DefaultMaxExportBatchSize > maxQueueSize { + maxExportBatchSize = maxQueueSize + } else { + maxExportBatchSize = DefaultMaxExportBatchSize + } + } + + o := sdktrace.BatchSpanProcessorOptions{ + BatchTimeout: time.Duration(BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond, + ExportTimeout: time.Duration(BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond, + MaxQueueSize: maxQueueSize, + MaxExportBatchSize: maxExportBatchSize, + } + for _, opt := range options { + opt(&o) + } + + // Setup metrics + mp := metricglobal.MeterProvider() + meter := mp.Meter("go.opentelemetry.io/otel/sdk/trace", + metric.WithInstrumentationVersion(otel.Version())) + counters := make(map[string]syncint64.Counter) + + // Spans received by processor + spansReceivedCounter, err := meter.SyncInt64().Counter(SpansReceivedCounter) + if err != nil { + otel.Handle(err) + } + + counters[SpansReceivedCounter] = spansReceivedCounter + + // Spans Dropped by processor once the buffer is full. + spansDroppedCounter, err := meter.SyncInt64().Counter(SpansDroppedCounter) + if err != nil { + otel.Handle(err) + } + + counters[SpansDroppedCounter] = spansDroppedCounter + + bsp := &batchSpanProcessor{ + e: exporter, + o: o, + batch: make([]sdktrace.ReadOnlySpan, 0, o.MaxExportBatchSize), + timer: time.NewTimer(o.BatchTimeout), + queue: make(chan sdktrace.ReadOnlySpan, o.MaxQueueSize), + stopCh: make(chan struct{}), + counters: counters, + } + + bsp.stopWait.Add(1) + go func() { + defer bsp.stopWait.Done() + bsp.processQueue() + bsp.drainQueue() + }() + + return bsp +} + +// OnStart method does nothing. +func (bsp *batchSpanProcessor) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) {} + +// OnEnd method enqueues a ReadOnlySpan for later processing. +func (bsp *batchSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) { + // Do not enqueue spans if we are just going to drop them. + if bsp.e == nil { + return + } + bsp.enqueue(s) +} + +// Shutdown flushes the queue and waits until all spans are processed. +// It only executes once. Subsequent call does nothing. +func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { + var err error + bsp.stopOnce.Do(func() { + wait := make(chan struct{}) + go func() { + close(bsp.stopCh) + bsp.stopWait.Wait() + if bsp.e != nil { + if err := bsp.e.Shutdown(ctx); err != nil { + otel.Handle(err) + } + } + close(wait) + }() + // Wait until the wait group is done or the context is cancelled + select { + case <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + }) + return err +} + +type forceFlushSpan struct { + sdktrace.ReadOnlySpan + flushed chan struct{} +} + +func (f forceFlushSpan) SpanContext() trace.SpanContext { + return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) +} + +// ForceFlush exports all ended spans that have not yet been exported. +func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { + var err error + if bsp.e != nil { + flushCh := make(chan struct{}) + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { + select { + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called + case <-ctx.Done(): + return ctx.Err() + } + } + + wait := make(chan error) + go func() { + wait <- bsp.exportSpans(ctx) + close(wait) + }() + // Wait until the export is finished or the context is cancelled/timed out + select { + case err = <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + } + return err +} + +// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the +// maximum queue size allowed for a BatchSpanProcessor. +func WithMaxQueueSize(size int) sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.MaxQueueSize = size + } +} + +// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures +// the maximum export batch size allowed for a BatchSpanProcessor. +func WithMaxExportBatchSize(size int) sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.MaxExportBatchSize = size + } +} + +// WithBatchTimeout returns a BatchSpanProcessorOption that configures the +// maximum delay allowed for a BatchSpanProcessor before it will export any +// held span (whether the queue is full or not). +func WithBatchTimeout(delay time.Duration) sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.BatchTimeout = delay + } +} + +// WithExportTimeout returns a BatchSpanProcessorOption that configures the +// amount of time a BatchSpanProcessor waits for an exporter to export before +// abandoning the export. +func WithExportTimeout(timeout time.Duration) sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.ExportTimeout = timeout + } +} + +// WithBlocking returns a BatchSpanProcessorOption that configures a +// BatchSpanProcessor to wait for enqueue operations to succeed instead of +// dropping data when the queue is full. +func WithBlocking() sdktrace.BatchSpanProcessorOption { + return func(o *sdktrace.BatchSpanProcessorOptions) { + o.BlockOnQueueFull = true + } +} + +// exportSpans is a subroutine of processing and draining the queue. +func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { + bsp.timer.Reset(bsp.o.BatchTimeout) + + bsp.batchMutex.Lock() + defer bsp.batchMutex.Unlock() + + if bsp.o.ExportTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) + defer cancel() + } + + if l := len(bsp.batch); l > 0 { + //global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + err := bsp.e.ExportSpans(ctx, bsp.batch) + + // A new batch is always created after exporting, even if the batch failed to be exported. + // + // It is up to the exporter to implement any type of retry logic if a batch is failing + // to be exported, since it is specific to the protocol and backend being sent to. + bsp.batch = bsp.batch[:0] + + if err != nil { + return err + } + } + return nil +} + +// processQueue removes spans from the `queue` channel until processor +// is shut down. It calls the exporter in batches of up to MaxExportBatchSize +// waiting up to BatchTimeout to form a batch. +func (bsp *batchSpanProcessor) processQueue() { + defer bsp.timer.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case <-bsp.stopCh: + return + case <-bsp.timer.C: + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + case sd := <-bsp.queue: + if ffs, ok := sd.(forceFlushSpan); ok { + close(ffs.flushed) + continue + } + bsp.batchMutex.Lock() + bsp.batch = append(bsp.batch, sd) + shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize + bsp.batchMutex.Unlock() + if shouldExport { + if !bsp.timer.Stop() { + <-bsp.timer.C + } + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + } + } + } +} + +// drainQueue awaits the any caller that had added to bsp.stopWait +// to finish the enqueue, then exports the final batch. +func (bsp *batchSpanProcessor) drainQueue() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case sd := <-bsp.queue: + if sd == nil { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + return + } + + bsp.batchMutex.Lock() + bsp.batch = append(bsp.batch, sd) + shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize + bsp.batchMutex.Unlock() + + if shouldExport { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + } + default: + close(bsp.queue) + } + } +} + +func (bsp *batchSpanProcessor) enqueue(sd sdktrace.ReadOnlySpan) { + ctx := context.TODO() + if bsp.o.BlockOnQueueFull { + bsp.enqueueBlockOnQueueFull(ctx, sd) + } else { + bsp.enqueueDrop(ctx, sd) + } +} + +func recoverSendOnClosedChan() { + x := recover() + switch err := x.(type) { + case nil: + return + case runtime.Error: + if err.Error() == "send on closed channel" { + return + } + } + panic(x) +} + +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd sdktrace.ReadOnlySpan) bool { + if !sd.SpanContext().IsSampled() { + return false + } + + // This ensures the bsp.queue<- below does not panic as the + // processor shuts down. + defer recoverSendOnClosedChan() + + select { + case <-bsp.stopCh: + return false + default: + } + + select { + case bsp.queue <- sd: + return true + case <-ctx.Done(): + return false + } +} + +func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd sdktrace.ReadOnlySpan) bool { + if !sd.SpanContext().IsSampled() { + // TODO: A metric for the non sampled ones? + return false + } + + // Count the span as received. + bsp.counters[SpansReceivedCounter].Add(ctx, 1) + + // This ensures the bsp.queue<- below does not panic as the + // processor shuts down. + defer recoverSendOnClosedChan() + + select { + case <-bsp.stopCh: + return false + default: + } + + select { + case bsp.queue <- sd: + return true + default: + atomic.AddUint32(&bsp.dropped, 1) + bsp.counters[SpansDroppedCounter].Add(ctx, 1) + } + return false +} + +// MarshalLog is the marshaling function used by the logging system to represent this exporter. +func (bsp *batchSpanProcessor) MarshalLog() interface{} { + return struct { + Type string + SpanExporter sdktrace.SpanExporter + Config sdktrace.BatchSpanProcessorOptions + }{ + Type: "BatchSpanProcessor", + SpanExporter: bsp.e, + Config: bsp.o, + } +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go b/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go new file mode 100644 index 00000000..5d438f0a --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go @@ -0,0 +1,14 @@ +package batchspanprocessor + +import ( + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func CreateBatchSpanProcessor(useModified bool, exporter sdktrace.SpanExporter, + options ...sdktrace.BatchSpanProcessorOption) sdktrace.SpanProcessor { + if useModified { + return NewBatchSpanProcessor(exporter, options...) + } else { + return sdktrace.NewBatchSpanProcessor(exporter, options...) + } +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/bspcreator_test.go b/instrumentation/opentelemetry/batchspanprocessor/bspcreator_test.go new file mode 100644 index 00000000..1dda510c --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/bspcreator_test.go @@ -0,0 +1,25 @@ +package batchspanprocessor + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +var batchTimeout = time.Duration(200) * time.Millisecond + +func TestCreateBsp(t *testing.T) { + exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint()) + require.NoError(t, err) + bsp := CreateBatchSpanProcessor(true, exporter, + sdktrace.WithBatchTimeout(batchTimeout)) + assert.NotNil(t, bsp) + + bsp = CreateBatchSpanProcessor(false, exporter, + sdktrace.WithBatchTimeout(batchTimeout)) + assert.NotNil(t, bsp) +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/env.go.original b/instrumentation/opentelemetry/batchspanprocessor/env.go.original new file mode 100644 index 00000000..5e94b8ae --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/env.go.original @@ -0,0 +1,177 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package env // import "go.opentelemetry.io/otel/sdk/internal/env" + +import ( + "os" + "strconv" + + "go.opentelemetry.io/otel/internal/global" +) + +// Environment variable names. +const ( + // BatchSpanProcessorScheduleDelayKey is the delay interval between two + // consecutive exports (i.e. 5000). + BatchSpanProcessorScheduleDelayKey = "OTEL_BSP_SCHEDULE_DELAY" + // BatchSpanProcessorExportTimeoutKey is the maximum allowed time to + // export data (i.e. 3000). + BatchSpanProcessorExportTimeoutKey = "OTEL_BSP_EXPORT_TIMEOUT" + // BatchSpanProcessorMaxQueueSizeKey is the maximum queue size (i.e. 2048). + BatchSpanProcessorMaxQueueSizeKey = "OTEL_BSP_MAX_QUEUE_SIZE" + // BatchSpanProcessorMaxExportBatchSizeKey is the maximum batch size (i.e. + // 512). Note: it must be less than or equal to + // EnvBatchSpanProcessorMaxQueueSize. + BatchSpanProcessorMaxExportBatchSizeKey = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE" + + // AttributeValueLengthKey is the maximum allowed attribute value size. + AttributeValueLengthKey = "OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT" + + // AttributeCountKey is the maximum allowed span attribute count. + AttributeCountKey = "OTEL_ATTRIBUTE_COUNT_LIMIT" + + // SpanAttributeValueLengthKey is the maximum allowed attribute value size + // for a span. + SpanAttributeValueLengthKey = "OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT" + + // SpanAttributeCountKey is the maximum allowed span attribute count for a + // span. + SpanAttributeCountKey = "OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT" + + // SpanEventCountKey is the maximum allowed span event count. + SpanEventCountKey = "OTEL_SPAN_EVENT_COUNT_LIMIT" + + // SpanEventAttributeCountKey is the maximum allowed attribute per span + // event count. + SpanEventAttributeCountKey = "OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT" + + // SpanLinkCountKey is the maximum allowed span link count. + SpanLinkCountKey = "OTEL_SPAN_LINK_COUNT_LIMIT" + + // SpanLinkAttributeCountKey is the maximum allowed attribute per span + // link count. + SpanLinkAttributeCountKey = "OTEL_LINK_ATTRIBUTE_COUNT_LIMIT" +) + +// firstInt returns the value of the first matching environment variable from +// keys. If the value is not an integer or no match is found, defaultValue is +// returned. +func firstInt(defaultValue int, keys ...string) int { + for _, key := range keys { + value, ok := os.LookupEnv(key) + if !ok { + continue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + global.Info("Got invalid value, number value expected.", key, value) + return defaultValue + } + + return intValue + } + + return defaultValue +} + +// IntEnvOr returns the int value of the environment variable with name key if +// it exists and the value is an int. Otherwise, defaultValue is returned. +func IntEnvOr(key string, defaultValue int) int { + value, ok := os.LookupEnv(key) + if !ok { + return defaultValue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + global.Info("Got invalid value, number value expected.", key, value) + return defaultValue + } + + return intValue +} + +// BatchSpanProcessorScheduleDelay returns the environment variable value for +// the OTEL_BSP_SCHEDULE_DELAY key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorScheduleDelay(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorScheduleDelayKey, defaultValue) +} + +// BatchSpanProcessorExportTimeout returns the environment variable value for +// the OTEL_BSP_EXPORT_TIMEOUT key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorExportTimeout(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorExportTimeoutKey, defaultValue) +} + +// BatchSpanProcessorMaxQueueSize returns the environment variable value for +// the OTEL_BSP_MAX_QUEUE_SIZE key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorMaxQueueSize(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorMaxQueueSizeKey, defaultValue) +} + +// BatchSpanProcessorMaxExportBatchSize returns the environment variable value for +// the OTEL_BSP_MAX_EXPORT_BATCH_SIZE key if it exists, otherwise defaultValue +// is returned. +func BatchSpanProcessorMaxExportBatchSize(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorMaxExportBatchSizeKey, defaultValue) +} + +// SpanAttributeValueLength returns the environment variable value for the +// OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT key if it exists. Otherwise, the +// environment variable value for OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT is +// returned or defaultValue if that is not set. +func SpanAttributeValueLength(defaultValue int) int { + return firstInt(defaultValue, SpanAttributeValueLengthKey, AttributeValueLengthKey) +} + +// SpanAttributeCount returns the environment variable value for the +// OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT key if it exists. Otherwise, the +// environment variable value for OTEL_ATTRIBUTE_COUNT_LIMIT is returned or +// defaultValue if that is not set. +func SpanAttributeCount(defaultValue int) int { + return firstInt(defaultValue, SpanAttributeCountKey, AttributeCountKey) +} + +// SpanEventCount returns the environment variable value for the +// OTEL_SPAN_EVENT_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanEventCount(defaultValue int) int { + return IntEnvOr(SpanEventCountKey, defaultValue) +} + +// SpanEventAttributeCount returns the environment variable value for the +// OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT key if it exists, otherwise defaultValue +// is returned. +func SpanEventAttributeCount(defaultValue int) int { + return IntEnvOr(SpanEventAttributeCountKey, defaultValue) +} + +// SpanLinkCount returns the environment variable value for the +// OTEL_SPAN_LINK_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanLinkCount(defaultValue int) int { + return IntEnvOr(SpanLinkCountKey, defaultValue) +} + +// SpanLinkAttributeCount returns the environment variable value for the +// OTEL_LINK_ATTRIBUTE_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanLinkAttributeCount(defaultValue int) int { + return IntEnvOr(SpanLinkAttributeCountKey, defaultValue) +} diff --git a/instrumentation/opentelemetry/batchspanprocessor/env.modified.go b/instrumentation/opentelemetry/batchspanprocessor/env.modified.go new file mode 100644 index 00000000..50a5906a --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/env.modified.go @@ -0,0 +1,176 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchspanprocessor // import "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" + +import ( + "log" + "os" + "strconv" +) + +// Environment variable names. +const ( + // BatchSpanProcessorScheduleDelayKey is the delay interval between two + // consecutive exports (i.e. 5000). + BatchSpanProcessorScheduleDelayKey = "OTEL_BSP_SCHEDULE_DELAY" + // BatchSpanProcessorExportTimeoutKey is the maximum allowed time to + // export data (i.e. 3000). + BatchSpanProcessorExportTimeoutKey = "OTEL_BSP_EXPORT_TIMEOUT" + // BatchSpanProcessorMaxQueueSizeKey is the maximum queue size (i.e. 2048). + BatchSpanProcessorMaxQueueSizeKey = "OTEL_BSP_MAX_QUEUE_SIZE" + // BatchSpanProcessorMaxExportBatchSizeKey is the maximum batch size (i.e. + // 512). Note: it must be less than or equal to + // EnvBatchSpanProcessorMaxQueueSize. + BatchSpanProcessorMaxExportBatchSizeKey = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE" + + // AttributeValueLengthKey is the maximum allowed attribute value size. + AttributeValueLengthKey = "OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT" + + // AttributeCountKey is the maximum allowed span attribute count. + AttributeCountKey = "OTEL_ATTRIBUTE_COUNT_LIMIT" + + // SpanAttributeValueLengthKey is the maximum allowed attribute value size + // for a span. + SpanAttributeValueLengthKey = "OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT" + + // SpanAttributeCountKey is the maximum allowed span attribute count for a + // span. + SpanAttributeCountKey = "OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT" + + // SpanEventCountKey is the maximum allowed span event count. + SpanEventCountKey = "OTEL_SPAN_EVENT_COUNT_LIMIT" + + // SpanEventAttributeCountKey is the maximum allowed attribute per span + // event count. + SpanEventAttributeCountKey = "OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT" + + // SpanLinkCountKey is the maximum allowed span link count. + SpanLinkCountKey = "OTEL_SPAN_LINK_COUNT_LIMIT" + + // SpanLinkAttributeCountKey is the maximum allowed attribute per span + // link count. + SpanLinkAttributeCountKey = "OTEL_LINK_ATTRIBUTE_COUNT_LIMIT" +) + +// firstInt returns the value of the first matching environment variable from +// keys. If the value is not an integer or no match is found, defaultValue is +// returned. +func firstInt(defaultValue int, keys ...string) int { + for _, key := range keys { + value, ok := os.LookupEnv(key) + if !ok { + continue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + log.Printf("Got invalid value, number value expected. key: %s, value: %s", key, value) + return defaultValue + } + + return intValue + } + + return defaultValue +} + +// IntEnvOr returns the int value of the environment variable with name key if +// it exists and the value is an int. Otherwise, defaultValue is returned. +func IntEnvOr(key string, defaultValue int) int { + value, ok := os.LookupEnv(key) + if !ok { + return defaultValue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + log.Printf("Got invalid value, number value expected. key: %s, value: %s", key, value) + return defaultValue + } + + return intValue +} + +// BatchSpanProcessorScheduleDelay returns the environment variable value for +// the OTEL_BSP_SCHEDULE_DELAY key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorScheduleDelay(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorScheduleDelayKey, defaultValue) +} + +// BatchSpanProcessorExportTimeout returns the environment variable value for +// the OTEL_BSP_EXPORT_TIMEOUT key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorExportTimeout(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorExportTimeoutKey, defaultValue) +} + +// BatchSpanProcessorMaxQueueSize returns the environment variable value for +// the OTEL_BSP_MAX_QUEUE_SIZE key if it exists, otherwise defaultValue is +// returned. +func BatchSpanProcessorMaxQueueSize(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorMaxQueueSizeKey, defaultValue) +} + +// BatchSpanProcessorMaxExportBatchSize returns the environment variable value for +// the OTEL_BSP_MAX_EXPORT_BATCH_SIZE key if it exists, otherwise defaultValue +// is returned. +func BatchSpanProcessorMaxExportBatchSize(defaultValue int) int { + return IntEnvOr(BatchSpanProcessorMaxExportBatchSizeKey, defaultValue) +} + +// SpanAttributeValueLength returns the environment variable value for the +// OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT key if it exists. Otherwise, the +// environment variable value for OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT is +// returned or defaultValue if that is not set. +func SpanAttributeValueLength(defaultValue int) int { + return firstInt(defaultValue, SpanAttributeValueLengthKey, AttributeValueLengthKey) +} + +// SpanAttributeCount returns the environment variable value for the +// OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT key if it exists. Otherwise, the +// environment variable value for OTEL_ATTRIBUTE_COUNT_LIMIT is returned or +// defaultValue if that is not set. +func SpanAttributeCount(defaultValue int) int { + return firstInt(defaultValue, SpanAttributeCountKey, AttributeCountKey) +} + +// SpanEventCount returns the environment variable value for the +// OTEL_SPAN_EVENT_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanEventCount(defaultValue int) int { + return IntEnvOr(SpanEventCountKey, defaultValue) +} + +// SpanEventAttributeCount returns the environment variable value for the +// OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT key if it exists, otherwise defaultValue +// is returned. +func SpanEventAttributeCount(defaultValue int) int { + return IntEnvOr(SpanEventAttributeCountKey, defaultValue) +} + +// SpanLinkCount returns the environment variable value for the +// OTEL_SPAN_LINK_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanLinkCount(defaultValue int) int { + return IntEnvOr(SpanLinkCountKey, defaultValue) +} + +// SpanLinkAttributeCount returns the environment variable value for the +// OTEL_LINK_ATTRIBUTE_COUNT_LIMIT key if it exists, otherwise defaultValue is +// returned. +func SpanLinkAttributeCount(defaultValue int) int { + return IntEnvOr(SpanLinkAttributeCountKey, defaultValue) +} diff --git a/instrumentation/opentelemetry/examples/init_additional_test.go b/instrumentation/opentelemetry/examples/init_additional_test.go index 5ed3e339..2c30791b 100644 --- a/instrumentation/opentelemetry/examples/init_additional_test.go +++ b/instrumentation/opentelemetry/examples/init_additional_test.go @@ -8,16 +8,16 @@ import ( "github.com/gorilla/mux" "github.com/hypertrace/goagent/config" hyperotel "github.com/hypertrace/goagent/instrumentation/opentelemetry" + modbsp "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" "github.com/hypertrace/goagent/instrumentation/opentelemetry/net/hyperhttp" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" ) -var otherSpanExporter trace.SpanExporter = nil +var otherSpanExporter sdktrace.SpanExporter = nil func ExampleInitAsAdditional() { hyperSpanProcessor, shutdown := hyperotel.InitAsAdditional(config.Load()) @@ -31,7 +31,8 @@ func ExampleInitAsAdditional() { ), ) - otherSpanProcessor := sdktrace.NewBatchSpanProcessor( + otherSpanProcessor := modbsp.CreateBatchSpanProcessor( + true, // use modified bsp hyperotel.RemoveGoAgentAttrs(otherSpanExporter), ) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index eeddebdf..e2ad2c4d 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -23,6 +23,7 @@ import ( config "github.com/hypertrace/agent-config/gen/go/v1" "go.opentelemetry.io/otel/attribute" + modbsp "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" "github.com/hypertrace/goagent/instrumentation/opentelemetry/internal/identifier" sdkconfig "github.com/hypertrace/goagent/sdk/config" "github.com/hypertrace/goagent/version" @@ -252,7 +253,10 @@ func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessor log.Fatal(err) } - sp := sdktrace.NewBatchSpanProcessor(exporter, sdktrace.WithBatchTimeout(batchTimeout)) + sp := modbsp.CreateBatchSpanProcessor( + cfg.GetTelemetry() != nil && cfg.GetTelemetry().GetMetricsEnabled().GetValue(), // metrics enabled + exporter, + sdktrace.WithBatchTimeout(batchTimeout)) if wrapper != nil { sp = &spanProcessorWithWrapper{wrapper, sp} } @@ -358,7 +362,10 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu log.Fatal(err) } - sp := sdktrace.NewBatchSpanProcessor(exporter, sdktrace.WithBatchTimeout(batchTimeout)) + sp := modbsp.CreateBatchSpanProcessor( + true, // ideally there should be no issue with using the modified bsp + exporter, + sdktrace.WithBatchTimeout(batchTimeout)) if wrapper != nil { sp = &spanProcessorWithWrapper{wrapper, sp} } diff --git a/instrumentation/opentelemetry/init_additional.go b/instrumentation/opentelemetry/init_additional.go index 762b9a63..ed55b9b8 100644 --- a/instrumentation/opentelemetry/init_additional.go +++ b/instrumentation/opentelemetry/init_additional.go @@ -6,6 +6,7 @@ import ( "strings" config "github.com/hypertrace/agent-config/gen/go/v1" + modbsp "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" sdkconfig "github.com/hypertrace/goagent/sdk/config" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/resource" @@ -42,13 +43,17 @@ func InitAsAdditional(cfg *config.AgentConfig) (trace.SpanProcessor, func()) { exporter = addResourceToSpans(exporter, resource) } - return trace.NewBatchSpanProcessor(exporter, trace.WithBatchTimeout(batchTimeout)), func() { - err := exporter.Shutdown(context.Background()) - if err != nil { - log.Printf("error while shutting down exporter: %v\n", err) + return modbsp.CreateBatchSpanProcessor( + cfg.GetTelemetry() != nil && cfg.GetTelemetry().GetMetricsEnabled().GetValue(), // metrics enabled + exporter, + trace.WithBatchTimeout(batchTimeout)), + func() { + err := exporter.Shutdown(context.Background()) + if err != nil { + log.Printf("error while shutting down exporter: %v\n", err) + } + sdkconfig.ResetConfig() } - sdkconfig.ResetConfig() - } } type shieldResourceSpan struct { From 023d45eb92769c7648a58f73dbb5dae3dae0ddf5 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Mon, 30 Jan 2023 07:21:43 -0800 Subject: [PATCH 10/13] bsp sampled metrics --- .../batch_span_processor.modified.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go index b5a908b7..a96d8dfd 100644 --- a/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go +++ b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go @@ -36,8 +36,9 @@ const ( DefaultScheduleDelay = 5000 DefaultExportTimeout = 30000 DefaultMaxExportBatchSize = 512 - SpansReceivedCounter = "hypertrace.bsp.spans_received" - SpansDroppedCounter = "hypertrace.bsp.spans_dropped" + SpansReceivedCounter = "hypertrace.agent.bsp.spans_received" + SpansDroppedCounter = "hypertrace.agent.bsp.spans_dropped" + SpansUnSampledCounter = "hypertrace.agent.bsp.spans_unsampled" ) // batchSpanProcessor is a SpanProcessor that batches asynchronously-received @@ -98,7 +99,6 @@ func NewBatchSpanProcessor(exporter sdktrace.SpanExporter, options ...sdktrace.B if err != nil { otel.Handle(err) } - counters[SpansReceivedCounter] = spansReceivedCounter // Spans Dropped by processor once the buffer is full. @@ -106,9 +106,15 @@ func NewBatchSpanProcessor(exporter sdktrace.SpanExporter, options ...sdktrace.B if err != nil { otel.Handle(err) } - counters[SpansDroppedCounter] = spansDroppedCounter + // Spans that are not sampled.(Useful to know when sampling is enabled) + spansUnSampledCounter, err := meter.SyncInt64().Counter(SpansUnSampledCounter) + if err != nil { + otel.Handle(err) + } + counters[SpansUnSampledCounter] = spansUnSampledCounter + bsp := &batchSpanProcessor{ e: exporter, o: o, @@ -262,6 +268,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { } if l := len(bsp.batch); l > 0 { + // TODO: replace this with an alternative? //global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) err := bsp.e.ExportSpans(ctx, bsp.batch) @@ -393,7 +400,8 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd s func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd sdktrace.ReadOnlySpan) bool { if !sd.SpanContext().IsSampled() { - // TODO: A metric for the non sampled ones? + // Count the span as unsampled + bsp.counters[SpansUnSampledCounter].Add(ctx, 1) return false } @@ -415,6 +423,7 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd sdktrace.Read return true default: atomic.AddUint32(&bsp.dropped, 1) + // Count the span as dropped. bsp.counters[SpansDroppedCounter].Add(ctx, 1) } return false From e5fedb73001460a2cd2eef812cd9e2954113c282 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Mon, 30 Jan 2023 07:40:44 -0800 Subject: [PATCH 11/13] fix lint issue --- instrumentation/opentelemetry/batchspanprocessor/bspcreator.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go b/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go index 5d438f0a..91af2984 100644 --- a/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go +++ b/instrumentation/opentelemetry/batchspanprocessor/bspcreator.go @@ -8,7 +8,6 @@ func CreateBatchSpanProcessor(useModified bool, exporter sdktrace.SpanExporter, options ...sdktrace.BatchSpanProcessorOption) sdktrace.SpanProcessor { if useModified { return NewBatchSpanProcessor(exporter, options...) - } else { - return sdktrace.NewBatchSpanProcessor(exporter, options...) } + return sdktrace.NewBatchSpanProcessor(exporter, options...) } From 7fca49445e447f2c435b248aef0518a2c5033df9 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Mon, 30 Jan 2023 07:49:07 -0800 Subject: [PATCH 12/13] fix failing unit test --- instrumentation/opentelemetry/init_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry/init_test.go b/instrumentation/opentelemetry/init_test.go index bb9bc84b..3c419547 100644 --- a/instrumentation/opentelemetry/init_test.go +++ b/instrumentation/opentelemetry/init_test.go @@ -152,6 +152,8 @@ func TestMultipleTraceProviders(t *testing.T) { cfg.Reporting.Endpoint = config.String(srv.URL) cfg.Reporting.TraceReporterType = config.TraceReporterType_ZIPKIN cfg.Enabled = config.Bool(true) + // Disable metrics to only test trace provider. + cfg.Telemetry.MetricsEnabled = config.Bool(false) // By doing this we make sure a batching isn't happening batchTimeout = time.Duration(10) * time.Second @@ -178,10 +180,9 @@ func TestMultipleTraceProviders(t *testing.T) { assert.Equal(t, 0, count) }) - // 2 requests for spans and 1 for metrics. - t.Run("test 3 requests after flush", func(t *testing.T) { + t.Run("test 2 requests after flush", func(t *testing.T) { shutdown() - assert.Equal(t, 3, count) + assert.Equal(t, 2, count) assert.Equal(t, 0, len(traceProviders)) }) } From fe1ac96e2426a9208900bfe5658dae3fce2daf34 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Mon, 30 Jan 2023 12:41:02 -0800 Subject: [PATCH 13/13] define logger for modified bsp --- .../batchspanprocessor/README.md | 2 ++ .../batch_span_processor.modified.go | 3 +- .../batchspanprocessor/env.modified.go | 5 ++-- .../batchspanprocessor/logger.go | 30 +++++++++++++++++++ 4 files changed, 35 insertions(+), 5 deletions(-) create mode 100644 instrumentation/opentelemetry/batchspanprocessor/logger.go diff --git a/instrumentation/opentelemetry/batchspanprocessor/README.md b/instrumentation/opentelemetry/batchspanprocessor/README.md index ef90a1de..e7ae6fb1 100644 --- a/instrumentation/opentelemetry/batchspanprocessor/README.md +++ b/instrumentation/opentelemetry/batchspanprocessor/README.md @@ -7,3 +7,5 @@ We have kept track of the original files modified so it's easier to figure out t The paths of the files modified: - [sdk/trace/batch_span_processor.go](https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/trace/batch_span_processor.go) - [sdk/internal/env/env.go](https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/internal/env/env.go) + +Since we cannot use [the internal logger]((https://github.com/open-telemetry/opentelemetry-go/blob/main/internal/global/internal_logging.go)), we have adapted it at [logger.go](instrumentation/opentelemetry/batchspanprocessor/logger.go). diff --git a/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go index a96d8dfd..7590b460 100644 --- a/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go +++ b/instrumentation/opentelemetry/batchspanprocessor/batch_span_processor.modified.go @@ -268,8 +268,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { } if l := len(bsp.batch); l > 0 { - // TODO: replace this with an alternative? - //global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) err := bsp.e.ExportSpans(ctx, bsp.batch) // A new batch is always created after exporting, even if the batch failed to be exported. diff --git a/instrumentation/opentelemetry/batchspanprocessor/env.modified.go b/instrumentation/opentelemetry/batchspanprocessor/env.modified.go index 50a5906a..815184d2 100644 --- a/instrumentation/opentelemetry/batchspanprocessor/env.modified.go +++ b/instrumentation/opentelemetry/batchspanprocessor/env.modified.go @@ -15,7 +15,6 @@ package batchspanprocessor // import "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" import ( - "log" "os" "strconv" ) @@ -76,7 +75,7 @@ func firstInt(defaultValue int, keys ...string) int { intValue, err := strconv.Atoi(value) if err != nil { - log.Printf("Got invalid value, number value expected. key: %s, value: %s", key, value) + Info("Got invalid value, number value expected.", key, value) return defaultValue } @@ -96,7 +95,7 @@ func IntEnvOr(key string, defaultValue int) int { intValue, err := strconv.Atoi(value) if err != nil { - log.Printf("Got invalid value, number value expected. key: %s, value: %s", key, value) + Info("Got invalid value, number value expected.", key, value) return defaultValue } diff --git a/instrumentation/opentelemetry/batchspanprocessor/logger.go b/instrumentation/opentelemetry/batchspanprocessor/logger.go new file mode 100644 index 00000000..22e8c42c --- /dev/null +++ b/instrumentation/opentelemetry/batchspanprocessor/logger.go @@ -0,0 +1,30 @@ +package batchspanprocessor // import "github.com/hypertrace/goagent/instrumentation/opentelemetry/batchspanprocessor" + +// Adapted from go.opentelemetry.io/otel/internal/global#internal_logging.go +import ( + "log" + "os" + + "github.com/go-logr/logr" + "github.com/go-logr/stdr" +) + +// The logger uses stdr which is backed by the standard `log.Logger` +// interface. This logger will only show messages at the Error Level. +var logger logr.Logger = stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)) + +// Info prints messages about the general state of the API or SDK. +// This should usually be less then 5 messages a minute. +func Info(msg string, keysAndValues ...interface{}) { + logger.V(1).Info(msg, keysAndValues...) +} + +// Error prints messages about exceptional states of the API or SDK. +func Error(err error, msg string, keysAndValues ...interface{}) { + logger.Error(err, msg, keysAndValues...) +} + +// Debug prints messages about all internal changes in the API or SDK. +func Debug(msg string, keysAndValues ...interface{}) { + logger.V(5).Info(msg, keysAndValues...) +}