From 05a89f30002fed92ad4c44807a91405fd7ec906d Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sun, 19 Feb 2023 22:52:03 +0100 Subject: [PATCH 1/8] Use OTel to export Prom Metrics Signed-off-by: Giovanni Liva --- go.mod | 7 ++ go.sum | 15 +++ pkg/service/connect_metrics.go | 209 ++++++++++++++------------------- pkg/service/connect_service.go | 11 +- 4 files changed, 120 insertions(+), 122 deletions(-) diff --git a/go.mod b/go.mod index 040e807d9..337a28c81 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,10 @@ require ( github.com/stretchr/testify v1.8.1 github.com/xeipuuv/gojsonschema v1.2.0 github.com/zeebo/xxh3 v1.0.2 + go.opentelemetry.io/otel v1.13.0 + go.opentelemetry.io/otel/exporters/prometheus v0.36.0 + go.opentelemetry.io/otel/metric v0.36.0 + go.opentelemetry.io/otel/sdk/metric v0.36.0 go.uber.org/zap v1.24.0 golang.org/x/net v0.7.0 golang.org/x/sync v0.1.0 @@ -49,6 +53,7 @@ require ( github.com/emicklei/go-restful/v3 v3.10.1 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/swag v0.22.3 // indirect @@ -95,6 +100,8 @@ require ( github.com/subosito/gotenv v1.4.2 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + go.opentelemetry.io/otel/sdk v1.13.0 // indirect + go.opentelemetry.io/otel/trace v1.13.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/oauth2 v0.4.0 // indirect diff --git a/go.sum b/go.sum index d9ae285cb..d193f75cb 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,11 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v1.2.3 h1:a9vnzlIBPQBBkeaR9IuMUfmVOrQlkoC4YfPoFkX3T7A= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= @@ -428,6 +431,18 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y= +go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg= +go.opentelemetry.io/otel/exporters/prometheus v0.36.0 h1:EbfJRxojnpb+ux8IO79oKHXu9jsbWjd00cT0XmbP5gU= +go.opentelemetry.io/otel/exporters/prometheus v0.36.0/go.mod h1:gYHAjuEuMrtPXccEHyvYcQVC//c4QwgQcUq1/3mx7Ys= +go.opentelemetry.io/otel/metric v0.36.0 h1:t0lgGI+L68QWt3QtOIlqM9gXoxqxWLhZ3R/e5oOAY0Q= +go.opentelemetry.io/otel/metric v0.36.0/go.mod h1:wKVw57sd2HdSZAzyfOM9gTqqE8v7CbqWsYL6AyrH9qk= +go.opentelemetry.io/otel/sdk v1.13.0 h1:BHib5g8MvdqS65yo2vV1s6Le42Hm6rrw08qU6yz5JaM= +go.opentelemetry.io/otel/sdk v1.13.0/go.mod h1:YLKPx5+6Vx/o1TCUYYs+bpymtkmazOMT6zoRrC7AQ7I= +go.opentelemetry.io/otel/sdk/metric v0.36.0 h1:dEXpkkOAEcHiRiaZdvd63MouV+3bCtAB/bF3jlNKnr8= +go.opentelemetry.io/otel/sdk/metric v0.36.0/go.mod h1:Lv4HQQPSCSkhyBKzLNtE8YhTSdK4HCwNh3lh7CiR20s= +go.opentelemetry.io/otel/trace v1.13.0 h1:CBgRZ6ntv+Amuj1jDsMhZtlAPT6gbyIRdaIzFhfBSdY= +go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= diff --git a/pkg/service/connect_metrics.go b/pkg/service/connect_metrics.go index 3fa142a87..b4c0cb635 100644 --- a/pkg/service/connect_metrics.go +++ b/pkg/service/connect_metrics.go @@ -2,6 +2,7 @@ package service import ( "bufio" + "context" "errors" "fmt" "net" @@ -9,7 +10,12 @@ import ( "strconv" "time" - "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/attribute" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.13.0" ) var ( @@ -26,13 +32,15 @@ type HTTPReqProperties struct { } type Recorder interface { - // ObserveHTTPRequestDuration measures the duration of an HTTP request. - ObserveHTTPRequestDuration(props HTTPReqProperties, duration time.Duration) - // ObserveHTTPResponseSize measures the size of an HTTP response in bytes. - ObserveHTTPResponseSize(props HTTPReqProperties, sizeBytes int64) - // AddInflightRequests increments and decrements the number of inflight request being - // processed. - AddInflightRequests(props HTTPProperties, quantity int) + // OTelObserveHTTPRequestDuration measures the duration of an HTTP request. + OTelObserveHTTPRequestDuration(props HTTPReqProperties, duration time.Duration) + // OTelObserveHTTPResponseSize measures the size of an HTTP response in bytes. + OTelObserveHTTPResponseSize(props HTTPReqProperties, sizeBytes int64) + + // OTelInFlightRequestStart count the active requests. + OTelInFlightRequestStart(props HTTPReqProperties) + // OTelInFlightRequestEnd count the finished requests. + OTelInFlightRequestEnd(props HTTPReqProperties) } type Reporter interface { @@ -47,42 +55,39 @@ type HTTPProperties struct { ID string } -type MetricsRecorder struct { - httpRequestDurHistogram *prometheus.HistogramVec - httpResponseSizeHistogram *prometheus.HistogramVec - httpRequestsInflight *prometheus.GaugeVec +type OTelMetricsRecorder struct { + httpRequestDurHistogram instrument.Float64Histogram + httpResponseSizeHistogram instrument.Float64Histogram + httpRequestsInflight instrument.Int64UpDownCounter } -func (r MetricsRecorder) ObserveHTTPRequestDuration(p HTTPReqProperties, duration time.Duration, -) { - r.httpRequestDurHistogram.WithLabelValues(p.Service, p.ID, p.Method, p.Code).Observe(duration.Seconds()) +func (r OTelMetricsRecorder) setAttributes(p HTTPReqProperties) []attribute.KeyValue { + return []attribute.KeyValue{ + semconv.ServiceNameKey.String(p.Service), + semconv.HTTPURLKey.String(p.ID), + semconv.HTTPMethodKey.String(p.Method), + semconv.HTTPStatusCodeKey.String(p.Code), + } } -func (r MetricsRecorder) ObserveHTTPResponseSize(p HTTPReqProperties, sizeBytes int64) { - r.httpResponseSizeHistogram.WithLabelValues(p.Service, p.ID, p.Method, p.Code).Observe(float64(sizeBytes)) +func (r OTelMetricsRecorder) OTelObserveHTTPRequestDuration(p HTTPReqProperties, duration time.Duration) { + r.httpRequestDurHistogram.Record(context.TODO(), duration.Seconds(), r.setAttributes(p)...) } - -func (r MetricsRecorder) AddInflightRequests(p HTTPProperties, quantity int) { - r.httpRequestsInflight.WithLabelValues(p.Service, p.ID).Add(float64(quantity)) +func (r OTelMetricsRecorder) OTelObserveHTTPResponseSize(p HTTPReqProperties, sizeBytes int64) { + r.httpResponseSizeHistogram.Record(context.TODO(), float64(sizeBytes), r.setAttributes(p)...) } - -type prometheusConfig struct { - Prefix string - DurationBuckets []float64 - SizeBuckets []float64 - Registry prometheus.Registerer - HandlerIDLabel string - StatusCodeLabel string - MethodLabel string - ServiceLabel string +func (r OTelMetricsRecorder) OTelInFlightRequest_Start(p HTTPReqProperties) { + r.httpRequestsInflight.Add(context.TODO(), 1, r.setAttributes(p)...) +} +func (r OTelMetricsRecorder) OTelInFlightRequest_End(p HTTPReqProperties) { + r.httpRequestsInflight.Add(context.TODO(), -1, r.setAttributes(p)...) } type middlewareConfig struct { - Recorder Recorder - Service string - GroupedStatus bool - DisableMeasureSize bool - DisableMeasureInflight bool + Recorder Recorder + Service string + GroupedStatus bool + DisableMeasureSize bool } type Middleware struct { @@ -103,71 +108,36 @@ func New(cfg middlewareConfig) Middleware { return m } -func (c *prometheusConfig) defaults() { - if len(c.DurationBuckets) == 0 { - c.DurationBuckets = prometheus.DefBuckets - } - - if len(c.SizeBuckets) == 0 { - c.SizeBuckets = prometheus.ExponentialBuckets(100, 10, 8) - } - - if c.Registry == nil { - c.Registry = prometheus.DefaultRegisterer - } - - if c.HandlerIDLabel == "" { - c.HandlerIDLabel = "handler" - } - - if c.StatusCodeLabel == "" { - c.StatusCodeLabel = "code" - } - - if c.MethodLabel == "" { - c.MethodLabel = "method" - } - - if c.ServiceLabel == "" { - c.ServiceLabel = "service" +func NewOTelRecorder(exporter *otelprom.Exporter) (*OTelMetricsRecorder, error) { + provider := metric.NewMeterProvider(metric.WithReader(exporter)) + meter := provider.Meter("openfeature/flagd") + hduration, err := meter.Float64Histogram( + "request_duration_seconds", + instrument.WithDescription("The latency of the HTTP requests"), + ) + if err != nil { + return nil, err } -} - -func NewRecorder(cfg prometheusConfig) *MetricsRecorder { - cfg.defaults() - - r := &MetricsRecorder{ - httpRequestDurHistogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: cfg.Prefix, - Subsystem: "http", - Name: "request_duration_seconds", - Help: "The latency of the HTTP requests.", - Buckets: cfg.DurationBuckets, - }, []string{cfg.ServiceLabel, cfg.HandlerIDLabel, cfg.MethodLabel, cfg.StatusCodeLabel}), - - httpResponseSizeHistogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: cfg.Prefix, - Subsystem: "http", - Name: "response_size_bytes", - Help: "The size of the HTTP responses.", - Buckets: cfg.SizeBuckets, - }, []string{cfg.ServiceLabel, cfg.HandlerIDLabel, cfg.MethodLabel, cfg.StatusCodeLabel}), - - httpRequestsInflight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: cfg.Prefix, - Subsystem: "http", - Name: "requests_inflight", - Help: "The number of inflight requests being handled at the same time.", - }, []string{cfg.ServiceLabel, cfg.HandlerIDLabel}), + hsize, err := meter.Float64Histogram( + "response_size_bytes", + instrument.WithDescription("The size of the HTTP responses"), + instrument.WithUnit(unit.Bytes), + ) + if err != nil { + return nil, err } - - cfg.Registry.MustRegister( - r.httpRequestDurHistogram, - r.httpResponseSizeHistogram, - r.httpRequestsInflight, + reqCounter, err := meter.Int64UpDownCounter( + "requests_inflight", + instrument.WithDescription("The number of inflight requests being handled at the same time"), ) - - return r + if err != nil { + return nil, err + } + return &OTelMetricsRecorder{ + httpRequestDurHistogram: hduration, + httpResponseSizeHistogram: hsize, + httpRequestsInflight: reqCounter, + }, nil } func (m Middleware) Measure(handlerID string, reporter Reporter, next func()) { @@ -178,32 +148,35 @@ func (m Middleware) Measure(handlerID string, reporter Reporter, next func()) { hid = reporter.URLPath() } + // If we need to group the status code, it uses the + // first number of the status code because is the least + // required identification way. + var code string + if m.cfg.GroupedStatus { + code = fmt.Sprintf("%dxx", reporter.StatusCode()/100) + } else { + code = strconv.Itoa(reporter.StatusCode()) + } + props := HTTPReqProperties{ + Service: m.cfg.Service, + ID: hid, + Method: reporter.Method(), + Code: code, + } + + m.cfg.Recorder.OTelInFlightRequestStart(props) + defer m.cfg.Recorder.OTelInFlightRequestEnd(props) + // Start the timer and when finishing measure the duration. start := time.Now() defer func() { duration := time.Since(start) - // If we need to group the status code, it uses the - // first number of the status code because is the least - // required identification way. - var code string - if m.cfg.GroupedStatus { - code = fmt.Sprintf("%dxx", reporter.StatusCode()/100) - } else { - code = strconv.Itoa(reporter.StatusCode()) - } - - props := HTTPReqProperties{ - Service: m.cfg.Service, - ID: hid, - Method: reporter.Method(), - Code: code, - } - m.cfg.Recorder.ObserveHTTPRequestDuration(props, duration) + m.cfg.Recorder.OTelObserveHTTPRequestDuration(props, duration) // Measure size of response if required. if !m.cfg.DisableMeasureSize { - m.cfg.Recorder.ObserveHTTPResponseSize(props, reporter.BytesWritten()) + m.cfg.Recorder.OTelObserveHTTPResponseSize(props, reporter.BytesWritten()) } }() @@ -229,12 +202,6 @@ func Handler(handlerID string, m Middleware, h http.Handler) http.Handler { }) } -func HandlerProvider(handlerID string, m Middleware) func(http.Handler) http.Handler { - return func(next http.Handler) http.Handler { - return Handler(handlerID, m, next) - } -} - type stdReporter struct { w *responseWriterInterceptor r *http.Request diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index fa92a7e85..0819df30d 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" "github.com/rs/xid" + "go.opentelemetry.io/otel/exporters/prometheus" "go.uber.org/zap" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" @@ -102,8 +103,16 @@ func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error } path, handler := schemaConnectV1.NewServiceHandler(s) mux.Handle(path, handler) + exporter, err := prometheus.New() + if err != nil { + return nil, err + } + metricRecorder, err := NewOTelRecorder(exporter) + if err != nil { + return nil, err + } mdlw := New(middlewareConfig{ - Recorder: NewRecorder(prometheusConfig{}), + Recorder: metricRecorder, }) h := Handler("", mdlw, mux) go func() { From ef9d793ba39b6ded50d59f07a9516bd6a7192aa2 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Mon, 20 Feb 2023 00:12:07 +0100 Subject: [PATCH 2/8] Refactor and add some tests Signed-off-by: Giovanni Liva --- pkg/service/connect_metrics.go | 45 ++++++----- pkg/service/connect_metrics_test.go | 119 ++++++++++++++++++++++++++++ pkg/service/connect_service.go | 9 +-- 3 files changed, 150 insertions(+), 23 deletions(-) create mode 100644 pkg/service/connect_metrics_test.go diff --git a/pkg/service/connect_metrics.go b/pkg/service/connect_metrics.go index b4c0cb635..9e244a196 100644 --- a/pkg/service/connect_metrics.go +++ b/pkg/service/connect_metrics.go @@ -5,13 +5,14 @@ import ( "context" "errors" "fmt" + "log" "net" "net/http" "strconv" "time" + "github.com/open-feature/flagd/pkg/logger" "go.opentelemetry.io/otel/attribute" - otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/metric" @@ -73,18 +74,23 @@ func (r OTelMetricsRecorder) setAttributes(p HTTPReqProperties) []attribute.KeyV func (r OTelMetricsRecorder) OTelObserveHTTPRequestDuration(p HTTPReqProperties, duration time.Duration) { r.httpRequestDurHistogram.Record(context.TODO(), duration.Seconds(), r.setAttributes(p)...) } + func (r OTelMetricsRecorder) OTelObserveHTTPResponseSize(p HTTPReqProperties, sizeBytes int64) { r.httpResponseSizeHistogram.Record(context.TODO(), float64(sizeBytes), r.setAttributes(p)...) } -func (r OTelMetricsRecorder) OTelInFlightRequest_Start(p HTTPReqProperties) { + +func (r OTelMetricsRecorder) OTelInFlightRequestStart(p HTTPReqProperties) { r.httpRequestsInflight.Add(context.TODO(), 1, r.setAttributes(p)...) } -func (r OTelMetricsRecorder) OTelInFlightRequest_End(p HTTPReqProperties) { + +func (r OTelMetricsRecorder) OTelInFlightRequestEnd(p HTTPReqProperties) { r.httpRequestsInflight.Add(context.TODO(), -1, r.setAttributes(p)...) } type middlewareConfig struct { - Recorder Recorder + recorder Recorder + MetricReader metric.Reader + Logger *logger.Logger Service string GroupedStatus bool DisableMeasureSize bool @@ -94,23 +100,26 @@ type Middleware struct { cfg middlewareConfig } -func (c *middlewareConfig) defaults() { - if c.Recorder == nil { - panic("recorder is required") - } -} - func New(cfg middlewareConfig) Middleware { cfg.defaults() - m := Middleware{cfg: cfg} - return m } -func NewOTelRecorder(exporter *otelprom.Exporter) (*OTelMetricsRecorder, error) { +func (cfg *middlewareConfig) defaults() { + var err error + if cfg.Logger == nil { + log.Fatal("Missing Logger") + } + cfg.recorder, err = cfg.newOTelRecorder(cfg.MetricReader) + if err != nil { + cfg.Logger.Warn(fmt.Sprintf("got error while setting up OpenTelemetry metric exporter: %v", err)) + } +} + +func (cfg *middlewareConfig) newOTelRecorder(exporter metric.Reader) (*OTelMetricsRecorder, error) { provider := metric.NewMeterProvider(metric.WithReader(exporter)) - meter := provider.Meter("openfeature/flagd") + meter := provider.Meter(cfg.Service) hduration, err := meter.Float64Histogram( "request_duration_seconds", instrument.WithDescription("The latency of the HTTP requests"), @@ -164,19 +173,19 @@ func (m Middleware) Measure(handlerID string, reporter Reporter, next func()) { Code: code, } - m.cfg.Recorder.OTelInFlightRequestStart(props) - defer m.cfg.Recorder.OTelInFlightRequestEnd(props) + m.cfg.recorder.OTelInFlightRequestStart(props) + defer m.cfg.recorder.OTelInFlightRequestEnd(props) // Start the timer and when finishing measure the duration. start := time.Now() defer func() { duration := time.Since(start) - m.cfg.Recorder.OTelObserveHTTPRequestDuration(props, duration) + m.cfg.recorder.OTelObserveHTTPRequestDuration(props, duration) // Measure size of response if required. if !m.cfg.DisableMeasureSize { - m.cfg.Recorder.OTelObserveHTTPResponseSize(props, reporter.BytesWritten()) + m.cfg.recorder.OTelObserveHTTPResponseSize(props, reporter.BytesWritten()) } }() diff --git a/pkg/service/connect_metrics_test.go b/pkg/service/connect_metrics_test.go new file mode 100644 index 000000000..1123f8608 --- /dev/null +++ b/pkg/service/connect_metrics_test.go @@ -0,0 +1,119 @@ +package service + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "github.com/open-feature/flagd/pkg/logger" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.13.0" + "go.uber.org/zap/zapcore" +) + +func TestSetAttributes(t *testing.T) { + tests := []struct { + name string + req HTTPReqProperties + want []attribute.KeyValue + }{ + { + name: "empty attributes", + req: HTTPReqProperties{ + Service: "", + ID: "", + Method: "", + Code: "", + }, + want: []attribute.KeyValue{ + semconv.ServiceNameKey.String(""), + semconv.HTTPURLKey.String(""), + semconv.HTTPMethodKey.String(""), + semconv.HTTPStatusCodeKey.String(""), + }, + }, + { + name: "some values", + req: HTTPReqProperties{ + Service: "myService", + ID: "#123", + Method: "POST", + Code: "300", + }, + want: []attribute.KeyValue{ + semconv.ServiceNameKey.String("myService"), + semconv.HTTPURLKey.String("#123"), + semconv.HTTPMethodKey.String("POST"), + semconv.HTTPStatusCodeKey.String("300"), + }, + }, + { + name: "special chars", + req: HTTPReqProperties{ + Service: "!@#$%^&*()_+|}{[];',./<>", + ID: "", + Method: "", + Code: "", + }, + want: []attribute.KeyValue{ + semconv.ServiceNameKey.String("!@#$%^&*()_+|}{[];',./<>"), + semconv.HTTPURLKey.String(""), + semconv.HTTPMethodKey.String(""), + semconv.HTTPStatusCodeKey.String(""), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rec := OTelMetricsRecorder{} + res := rec.setAttributes(tt.req) + if len(res) != 4 { + t.Errorf("OTelMetricsRecorder.setAttributes() must provide 4 attributes") + } + for i := 0; i < 4; i++ { + if !reflect.DeepEqual(res[i], tt.want[i]) { + t.Errorf("attribute %d = %v, want %v", i, res[i], tt.want[i]) + } + } + }) + } +} + +func TestMiddleware(t *testing.T) { + const svcName = "mySvc" + exp := metric.NewManualReader() + l, _ := logger.NewZapLogger(zapcore.DebugLevel, "") + m := New(middlewareConfig{ + MetricReader: exp, + Service: svcName, + Logger: logger.NewLogger(l, true), + }) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("answer")) + }) + svr := httptest.NewServer(Handler("id", m, handler)) + defer svr.Close() + resp, err := http.Get(svr.URL) + if err != nil { + t.Errorf("Got %v", err) + } + _, _ = io.ReadAll(resp.Body) + data, err := exp.Collect(context.TODO()) + if err != nil { + t.Errorf("Got %v", err) + } + if len(data.ScopeMetrics) != 1 { + t.Errorf("A single scope is expected, got %d", len(data.ScopeMetrics)) + } + scopeMetrics := data.ScopeMetrics[0] + if !reflect.DeepEqual(scopeMetrics.Scope.Name, svcName) { + t.Errorf("Scope name %s, want %s", scopeMetrics.Scope.Name, svcName) + } + if len(scopeMetrics.Metrics) != 3 { + t.Errorf("Expected 3 metrics, got %d", len(scopeMetrics.Metrics)) + } +} diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index 0819df30d..87c6d6267 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -107,12 +107,11 @@ func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error if err != nil { return nil, err } - metricRecorder, err := NewOTelRecorder(exporter) - if err != nil { - return nil, err - } + mdlw := New(middlewareConfig{ - Recorder: metricRecorder, + Service: "openfeature/flagd", + MetricReader: exporter, + Logger: s.Logger, }) h := Handler("", mdlw, mux) go func() { From 67db19dc7f15669d75a773ad73d7f28b61329222 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Mon, 20 Feb 2023 00:39:40 +0100 Subject: [PATCH 3/8] polishing Signed-off-by: Giovanni Liva --- pkg/service/connect_metrics.go | 27 +++++++++------------------ pkg/service/connect_metrics_test.go | 16 ++++++++++++++++ 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/pkg/service/connect_metrics.go b/pkg/service/connect_metrics.go index 9e244a196..293f089c6 100644 --- a/pkg/service/connect_metrics.go +++ b/pkg/service/connect_metrics.go @@ -107,46 +107,37 @@ func New(cfg middlewareConfig) Middleware { } func (cfg *middlewareConfig) defaults() { - var err error if cfg.Logger == nil { log.Fatal("Missing Logger") } - cfg.recorder, err = cfg.newOTelRecorder(cfg.MetricReader) - if err != nil { - cfg.Logger.Warn(fmt.Sprintf("got error while setting up OpenTelemetry metric exporter: %v", err)) + if cfg.MetricReader == nil { + log.Fatal("Missing OpenTelemetry MetricReader/Exporter") } + cfg.recorder = cfg.newOTelRecorder(cfg.MetricReader) } -func (cfg *middlewareConfig) newOTelRecorder(exporter metric.Reader) (*OTelMetricsRecorder, error) { +func (cfg *middlewareConfig) newOTelRecorder(exporter metric.Reader) *OTelMetricsRecorder { provider := metric.NewMeterProvider(metric.WithReader(exporter)) meter := provider.Meter(cfg.Service) - hduration, err := meter.Float64Histogram( + // we can ignore errors from OpenTelemetry since they could occur if we select the wrong aggregator + hduration, _ := meter.Float64Histogram( "request_duration_seconds", instrument.WithDescription("The latency of the HTTP requests"), ) - if err != nil { - return nil, err - } - hsize, err := meter.Float64Histogram( + hsize, _ := meter.Float64Histogram( "response_size_bytes", instrument.WithDescription("The size of the HTTP responses"), instrument.WithUnit(unit.Bytes), ) - if err != nil { - return nil, err - } - reqCounter, err := meter.Int64UpDownCounter( + reqCounter, _ := meter.Int64UpDownCounter( "requests_inflight", instrument.WithDescription("The number of inflight requests being handled at the same time"), ) - if err != nil { - return nil, err - } return &OTelMetricsRecorder{ httpRequestDurHistogram: hduration, httpResponseSizeHistogram: hsize, httpRequestsInflight: reqCounter, - }, nil + } } func (m Middleware) Measure(handlerID string, reporter Reporter, next func()) { diff --git a/pkg/service/connect_metrics_test.go b/pkg/service/connect_metrics_test.go index 1123f8608..633016850 100644 --- a/pkg/service/connect_metrics_test.go +++ b/pkg/service/connect_metrics_test.go @@ -117,3 +117,19 @@ func TestMiddleware(t *testing.T) { t.Errorf("Expected 3 metrics, got %d", len(scopeMetrics.Metrics)) } } + +func TestNew_AutowireOTel(t *testing.T) { + l, _ := logger.NewZapLogger(zapcore.DebugLevel, "") + log := logger.NewLogger(l, true) + exp := metric.NewManualReader() + mdw := New(middlewareConfig{ + MetricReader: exp, + Logger: log, + Service: "mySvc", + GroupedStatus: false, + DisableMeasureSize: false, + }) + if mdw.cfg.recorder == nil { + t.Errorf("Expected OpenTelemetry to be configured, got nil") + } +} From 86946b18121232d486f76dff25a9133f03c3dcb4 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Mon, 20 Feb 2023 18:46:29 +0100 Subject: [PATCH 4/8] review Signed-off-by: Giovanni Liva --- pkg/service/connect_metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/service/connect_metrics.go b/pkg/service/connect_metrics.go index 293f089c6..5c4b4392a 100644 --- a/pkg/service/connect_metrics.go +++ b/pkg/service/connect_metrics.go @@ -108,10 +108,10 @@ func New(cfg middlewareConfig) Middleware { func (cfg *middlewareConfig) defaults() { if cfg.Logger == nil { - log.Fatal("Missing Logger") + log.Fatal("missing logger") } if cfg.MetricReader == nil { - log.Fatal("Missing OpenTelemetry MetricReader/Exporter") + log.Fatal("missing MetricReader/Exporter") } cfg.recorder = cfg.newOTelRecorder(cfg.MetricReader) } From e88b18072dbd6b620b0becde37c215a07b3aa961 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Thu, 23 Feb 2023 13:22:56 +0100 Subject: [PATCH 5/8] adjust bucket size and names Signed-off-by: Giovanni Liva --- pkg/service/connect_metrics.go | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/pkg/service/connect_metrics.go b/pkg/service/connect_metrics.go index 5c4b4392a..e72dd7565 100644 --- a/pkg/service/connect_metrics.go +++ b/pkg/service/connect_metrics.go @@ -5,6 +5,9 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "log" "net" "net/http" @@ -116,21 +119,43 @@ func (cfg *middlewareConfig) defaults() { cfg.recorder = cfg.newOTelRecorder(cfg.MetricReader) } +func (cfg *middlewareConfig) getDurationView(name string, bucket []float64) metric.View { + scope := instrumentation.Scope{ + Name: cfg.Service, + } + return metric.NewView( + metric.Instrument{ + Name: name, + Scope: scope, + }, + metric.Stream{Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: bucket, + }}, + ) +} + func (cfg *middlewareConfig) newOTelRecorder(exporter metric.Reader) *OTelMetricsRecorder { - provider := metric.NewMeterProvider(metric.WithReader(exporter)) + const requestDurationName = "http_request_duration_seconds" + const responseSizeName = "http_response_size_bytes" + + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithView(cfg.getDurationView(requestDurationName, prometheus.DefBuckets)), + metric.WithView(cfg.getDurationView(responseSizeName, prometheus.ExponentialBuckets(100, 10, 8))), + ) meter := provider.Meter(cfg.Service) // we can ignore errors from OpenTelemetry since they could occur if we select the wrong aggregator hduration, _ := meter.Float64Histogram( - "request_duration_seconds", + requestDurationName, instrument.WithDescription("The latency of the HTTP requests"), ) hsize, _ := meter.Float64Histogram( - "response_size_bytes", + responseSizeName, instrument.WithDescription("The size of the HTTP responses"), instrument.WithUnit(unit.Bytes), ) reqCounter, _ := meter.Int64UpDownCounter( - "requests_inflight", + "http_requests_inflight", instrument.WithDescription("The number of inflight requests being handled at the same time"), ) return &OTelMetricsRecorder{ From 6cb8925c336e3bbdb011fc8a39581ad83ae98e12 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Thu, 23 Feb 2023 13:24:33 +0100 Subject: [PATCH 6/8] add comments Signed-off-by: Giovanni Liva --- pkg/service/connect_metrics.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/service/connect_metrics.go b/pkg/service/connect_metrics.go index e72dd7565..52daaa764 100644 --- a/pkg/service/connect_metrics.go +++ b/pkg/service/connect_metrics.go @@ -120,13 +120,13 @@ func (cfg *middlewareConfig) defaults() { } func (cfg *middlewareConfig) getDurationView(name string, bucket []float64) metric.View { - scope := instrumentation.Scope{ - Name: cfg.Service, - } return metric.NewView( metric.Instrument{ - Name: name, - Scope: scope, + // we change aggregation only for instruments with this name and scope + Name: name, + Scope: instrumentation.Scope{ + Name: cfg.Service, + }, }, metric.Stream{Aggregation: aggregation.ExplicitBucketHistogram{ Boundaries: bucket, @@ -138,6 +138,7 @@ func (cfg *middlewareConfig) newOTelRecorder(exporter metric.Reader) *OTelMetric const requestDurationName = "http_request_duration_seconds" const responseSizeName = "http_response_size_bytes" + // create a metric provider with custom bucket size for histograms provider := metric.NewMeterProvider( metric.WithReader(exporter), metric.WithView(cfg.getDurationView(requestDurationName, prometheus.DefBuckets)), From 69a4506c86ae609f5897d54b65cc40e5c85f39c2 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Thu, 23 Feb 2023 13:34:23 +0100 Subject: [PATCH 7/8] fix import order Signed-off-by: Giovanni Liva --- pkg/service/connect_metrics.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/service/connect_metrics.go b/pkg/service/connect_metrics.go index 52daaa764..0176ec392 100644 --- a/pkg/service/connect_metrics.go +++ b/pkg/service/connect_metrics.go @@ -5,9 +5,6 @@ import ( "context" "errors" "fmt" - "github.com/prometheus/client_golang/prometheus" - "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "log" "net" "net/http" @@ -15,10 +12,14 @@ import ( "time" "github.com/open-feature/flagd/pkg/logger" + "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + semconv "go.opentelemetry.io/otel/semconv/v1.13.0" ) From 3a1f43d52434cd573b1bf3d7954ff77b09d14e74 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 27 Feb 2023 14:34:55 -0500 Subject: [PATCH 8/8] fixup: bind metrics in separate func Signed-off-by: Todd Baert --- pkg/service/connect_service.go | 57 ++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index 87c6d6267..f35b04521 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -114,33 +114,8 @@ func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error Logger: s.Logger, }) h := Handler("", mdlw, mux) - go func() { - s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", s.ConnectServiceConfiguration.MetricsPort)) - server := &http.Server{ - Addr: fmt.Sprintf(":%d", s.ConnectServiceConfiguration.MetricsPort), - ReadHeaderTimeout: 3 * time.Second, - } - server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/healthz": - w.WriteHeader(http.StatusOK) - case "/readyz": - if svcConf.ReadinessProbe() { - w.WriteHeader(http.StatusOK) - } else { - w.WriteHeader(http.StatusPreconditionFailed) - } - case "/metrics": - promhttp.Handler().ServeHTTP(w, r) - default: - w.WriteHeader(http.StatusNotFound) - } - }) - err := server.ListenAndServe() - if err != nil { - panic(err) - } - }() + + go bindMetrics(s, svcConf) if s.ConnectServiceConfiguration.ServerCertPath != "" && s.ConnectServiceConfiguration.ServerKeyPath != "" { handler = s.newCORS().Handler(h) @@ -392,6 +367,34 @@ func (s *ConnectService) newCORS() *cors.Cors { }) } +func bindMetrics(s *ConnectService, svcConf Configuration) { + s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", s.ConnectServiceConfiguration.MetricsPort)) + server := &http.Server{ + Addr: fmt.Sprintf(":%d", s.ConnectServiceConfiguration.MetricsPort), + ReadHeaderTimeout: 3 * time.Second, + } + server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/healthz": + w.WriteHeader(http.StatusOK) + case "/readyz": + if svcConf.ReadinessProbe() { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusPreconditionFailed) + } + case "/metrics": + promhttp.Handler().ServeHTTP(w, r) + default: + w.WriteHeader(http.StatusNotFound) + } + }) + err := server.ListenAndServe() + if err != nil { + panic(err) + } +} + func errFormat(err error) error { switch err.Error() { case model.FlagNotFoundErrorCode: