Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ require (
github.com/stretchr/testify v1.8.1
github.com/xeipuuv/gojsonschema v1.2.0
github.com/zeebo/xxh3 v1.0.2
go.opentelemetry.io/otel v1.13.0
go.opentelemetry.io/otel/exporters/prometheus v0.36.0
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk/metric v0.36.0
go.uber.org/zap v1.24.0
golang.org/x/net v0.7.0
golang.org/x/sync v0.1.0
Expand All @@ -49,6 +53,7 @@ require (
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
Expand Down Expand Up @@ -95,6 +100,8 @@ require (
github.com/subosito/gotenv v1.4.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
go.opentelemetry.io/otel/sdk v1.13.0 // indirect
go.opentelemetry.io/otel/trace v1.13.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
Expand Down
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,11 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-logr/zapr v1.2.3 h1:a9vnzlIBPQBBkeaR9IuMUfmVOrQlkoC4YfPoFkX3T7A=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
Expand Down Expand Up @@ -428,6 +431,18 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y=
go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg=
go.opentelemetry.io/otel/exporters/prometheus v0.36.0 h1:EbfJRxojnpb+ux8IO79oKHXu9jsbWjd00cT0XmbP5gU=
go.opentelemetry.io/otel/exporters/prometheus v0.36.0/go.mod h1:gYHAjuEuMrtPXccEHyvYcQVC//c4QwgQcUq1/3mx7Ys=
go.opentelemetry.io/otel/metric v0.36.0 h1:t0lgGI+L68QWt3QtOIlqM9gXoxqxWLhZ3R/e5oOAY0Q=
go.opentelemetry.io/otel/metric v0.36.0/go.mod h1:wKVw57sd2HdSZAzyfOM9gTqqE8v7CbqWsYL6AyrH9qk=
go.opentelemetry.io/otel/sdk v1.13.0 h1:BHib5g8MvdqS65yo2vV1s6Le42Hm6rrw08qU6yz5JaM=
go.opentelemetry.io/otel/sdk v1.13.0/go.mod h1:YLKPx5+6Vx/o1TCUYYs+bpymtkmazOMT6zoRrC7AQ7I=
go.opentelemetry.io/otel/sdk/metric v0.36.0 h1:dEXpkkOAEcHiRiaZdvd63MouV+3bCtAB/bF3jlNKnr8=
go.opentelemetry.io/otel/sdk/metric v0.36.0/go.mod h1:Lv4HQQPSCSkhyBKzLNtE8YhTSdK4HCwNh3lh7CiR20s=
go.opentelemetry.io/otel/trace v1.13.0 h1:CBgRZ6ntv+Amuj1jDsMhZtlAPT6gbyIRdaIzFhfBSdY=
go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
Expand Down
238 changes: 116 additions & 122 deletions pkg/service/connect_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@ package service

import (
"bufio"
"context"
"errors"
"fmt"
"log"
"net"
"net/http"
"strconv"
"time"

"github.com/open-feature/flagd/pkg/logger"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"

semconv "go.opentelemetry.io/otel/semconv/v1.13.0"
)

var (
Expand All @@ -26,13 +37,15 @@ type HTTPReqProperties struct {
}

type Recorder interface {
// ObserveHTTPRequestDuration measures the duration of an HTTP request.
ObserveHTTPRequestDuration(props HTTPReqProperties, duration time.Duration)
// ObserveHTTPResponseSize measures the size of an HTTP response in bytes.
ObserveHTTPResponseSize(props HTTPReqProperties, sizeBytes int64)
// AddInflightRequests increments and decrements the number of inflight request being
// processed.
AddInflightRequests(props HTTPProperties, quantity int)
// OTelObserveHTTPRequestDuration measures the duration of an HTTP request.
OTelObserveHTTPRequestDuration(props HTTPReqProperties, duration time.Duration)
// OTelObserveHTTPResponseSize measures the size of an HTTP response in bytes.
OTelObserveHTTPResponseSize(props HTTPReqProperties, sizeBytes int64)

// OTelInFlightRequestStart count the active requests.
OTelInFlightRequestStart(props HTTPReqProperties)
// OTelInFlightRequestEnd count the finished requests.
OTelInFlightRequestEnd(props HTTPReqProperties)
}

type Reporter interface {
Expand All @@ -47,127 +60,111 @@ type HTTPProperties struct {
ID string
}

type MetricsRecorder struct {
httpRequestDurHistogram *prometheus.HistogramVec
httpResponseSizeHistogram *prometheus.HistogramVec
httpRequestsInflight *prometheus.GaugeVec
type OTelMetricsRecorder struct {
httpRequestDurHistogram instrument.Float64Histogram
httpResponseSizeHistogram instrument.Float64Histogram
httpRequestsInflight instrument.Int64UpDownCounter
}

func (r MetricsRecorder) ObserveHTTPRequestDuration(p HTTPReqProperties, duration time.Duration,
) {
r.httpRequestDurHistogram.WithLabelValues(p.Service, p.ID, p.Method, p.Code).Observe(duration.Seconds())
func (r OTelMetricsRecorder) setAttributes(p HTTPReqProperties) []attribute.KeyValue {
return []attribute.KeyValue{
semconv.ServiceNameKey.String(p.Service),
semconv.HTTPURLKey.String(p.ID),
semconv.HTTPMethodKey.String(p.Method),
semconv.HTTPStatusCodeKey.String(p.Code),
}
}

func (r OTelMetricsRecorder) OTelObserveHTTPRequestDuration(p HTTPReqProperties, duration time.Duration) {
r.httpRequestDurHistogram.Record(context.TODO(), duration.Seconds(), r.setAttributes(p)...)
}

func (r MetricsRecorder) ObserveHTTPResponseSize(p HTTPReqProperties, sizeBytes int64) {
r.httpResponseSizeHistogram.WithLabelValues(p.Service, p.ID, p.Method, p.Code).Observe(float64(sizeBytes))
func (r OTelMetricsRecorder) OTelObserveHTTPResponseSize(p HTTPReqProperties, sizeBytes int64) {
r.httpResponseSizeHistogram.Record(context.TODO(), float64(sizeBytes), r.setAttributes(p)...)
}

func (r MetricsRecorder) AddInflightRequests(p HTTPProperties, quantity int) {
r.httpRequestsInflight.WithLabelValues(p.Service, p.ID).Add(float64(quantity))
func (r OTelMetricsRecorder) OTelInFlightRequestStart(p HTTPReqProperties) {
r.httpRequestsInflight.Add(context.TODO(), 1, r.setAttributes(p)...)
}

type prometheusConfig struct {
Prefix string
DurationBuckets []float64
SizeBuckets []float64
Registry prometheus.Registerer
HandlerIDLabel string
StatusCodeLabel string
MethodLabel string
ServiceLabel string
func (r OTelMetricsRecorder) OTelInFlightRequestEnd(p HTTPReqProperties) {
r.httpRequestsInflight.Add(context.TODO(), -1, r.setAttributes(p)...)
}

type middlewareConfig struct {
Recorder Recorder
Service string
GroupedStatus bool
DisableMeasureSize bool
DisableMeasureInflight bool
recorder Recorder
MetricReader metric.Reader
Logger *logger.Logger
Service string
GroupedStatus bool
DisableMeasureSize bool
}

type Middleware struct {
cfg middlewareConfig
}

func (c *middlewareConfig) defaults() {
if c.Recorder == nil {
panic("recorder is required")
}
}

func New(cfg middlewareConfig) Middleware {
cfg.defaults()

m := Middleware{cfg: cfg}

return m
}

func (c *prometheusConfig) defaults() {
if len(c.DurationBuckets) == 0 {
c.DurationBuckets = prometheus.DefBuckets
}

if len(c.SizeBuckets) == 0 {
c.SizeBuckets = prometheus.ExponentialBuckets(100, 10, 8)
func (cfg *middlewareConfig) defaults() {
if cfg.Logger == nil {
log.Fatal("missing logger")
}

if c.Registry == nil {
c.Registry = prometheus.DefaultRegisterer
}

if c.HandlerIDLabel == "" {
c.HandlerIDLabel = "handler"
}

if c.StatusCodeLabel == "" {
c.StatusCodeLabel = "code"
}

if c.MethodLabel == "" {
c.MethodLabel = "method"
}

if c.ServiceLabel == "" {
c.ServiceLabel = "service"
if cfg.MetricReader == nil {
log.Fatal("missing MetricReader/Exporter")
}
cfg.recorder = cfg.newOTelRecorder(cfg.MetricReader)
}

func NewRecorder(cfg prometheusConfig) *MetricsRecorder {
cfg.defaults()
func (cfg *middlewareConfig) getDurationView(name string, bucket []float64) metric.View {
return metric.NewView(
metric.Instrument{
// we change aggregation only for instruments with this name and scope
Name: name,
Scope: instrumentation.Scope{
Name: cfg.Service,
},
},
metric.Stream{Aggregation: aggregation.ExplicitBucketHistogram{
Boundaries: bucket,
}},
)
}

r := &MetricsRecorder{
httpRequestDurHistogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: cfg.Prefix,
Subsystem: "http",
Name: "request_duration_seconds",
Help: "The latency of the HTTP requests.",
Buckets: cfg.DurationBuckets,
}, []string{cfg.ServiceLabel, cfg.HandlerIDLabel, cfg.MethodLabel, cfg.StatusCodeLabel}),

httpResponseSizeHistogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: cfg.Prefix,
Subsystem: "http",
Name: "response_size_bytes",
Help: "The size of the HTTP responses.",
Buckets: cfg.SizeBuckets,
}, []string{cfg.ServiceLabel, cfg.HandlerIDLabel, cfg.MethodLabel, cfg.StatusCodeLabel}),

httpRequestsInflight: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: cfg.Prefix,
Subsystem: "http",
Name: "requests_inflight",
Help: "The number of inflight requests being handled at the same time.",
}, []string{cfg.ServiceLabel, cfg.HandlerIDLabel}),
}
func (cfg *middlewareConfig) newOTelRecorder(exporter metric.Reader) *OTelMetricsRecorder {
const requestDurationName = "http_request_duration_seconds"
const responseSizeName = "http_response_size_bytes"

cfg.Registry.MustRegister(
r.httpRequestDurHistogram,
r.httpResponseSizeHistogram,
r.httpRequestsInflight,
// create a metric provider with custom bucket size for histograms
provider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithView(cfg.getDurationView(requestDurationName, prometheus.DefBuckets)),
metric.WithView(cfg.getDurationView(responseSizeName, prometheus.ExponentialBuckets(100, 10, 8))),
)

return r
meter := provider.Meter(cfg.Service)
// we can ignore errors from OpenTelemetry since they could occur if we select the wrong aggregator
hduration, _ := meter.Float64Histogram(
requestDurationName,
instrument.WithDescription("The latency of the HTTP requests"),
)
hsize, _ := meter.Float64Histogram(
responseSizeName,
instrument.WithDescription("The size of the HTTP responses"),
instrument.WithUnit(unit.Bytes),
)
reqCounter, _ := meter.Int64UpDownCounter(
"http_requests_inflight",
instrument.WithDescription("The number of inflight requests being handled at the same time"),
)
return &OTelMetricsRecorder{
httpRequestDurHistogram: hduration,
httpResponseSizeHistogram: hsize,
httpRequestsInflight: reqCounter,
}
}

func (m Middleware) Measure(handlerID string, reporter Reporter, next func()) {
Expand All @@ -178,32 +175,35 @@ func (m Middleware) Measure(handlerID string, reporter Reporter, next func()) {
hid = reporter.URLPath()
}

// If we need to group the status code, it uses the
// first number of the status code because is the least
// required identification way.
var code string
if m.cfg.GroupedStatus {
code = fmt.Sprintf("%dxx", reporter.StatusCode()/100)
} else {
code = strconv.Itoa(reporter.StatusCode())
}
props := HTTPReqProperties{
Service: m.cfg.Service,
ID: hid,
Method: reporter.Method(),
Code: code,
}

m.cfg.recorder.OTelInFlightRequestStart(props)
defer m.cfg.recorder.OTelInFlightRequestEnd(props)

// Start the timer and when finishing measure the duration.
start := time.Now()
defer func() {
duration := time.Since(start)

// If we need to group the status code, it uses the
// first number of the status code because is the least
// required identification way.
var code string
if m.cfg.GroupedStatus {
code = fmt.Sprintf("%dxx", reporter.StatusCode()/100)
} else {
code = strconv.Itoa(reporter.StatusCode())
}

props := HTTPReqProperties{
Service: m.cfg.Service,
ID: hid,
Method: reporter.Method(),
Code: code,
}
m.cfg.Recorder.ObserveHTTPRequestDuration(props, duration)
m.cfg.recorder.OTelObserveHTTPRequestDuration(props, duration)

// Measure size of response if required.
if !m.cfg.DisableMeasureSize {
m.cfg.Recorder.ObserveHTTPResponseSize(props, reporter.BytesWritten())
m.cfg.recorder.OTelObserveHTTPResponseSize(props, reporter.BytesWritten())
}
}()

Expand All @@ -229,12 +229,6 @@ func Handler(handlerID string, m Middleware, h http.Handler) http.Handler {
})
}

func HandlerProvider(handlerID string, m Middleware) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return Handler(handlerID, m, next)
}
}

type stdReporter struct {
w *responseWriterInterceptor
r *http.Request
Expand Down
Loading