From 0cada31cc2a87ea82f635075582d7470650ff924 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 22 Mar 2018 01:11:27 -0500 Subject: [PATCH] Metrics - Influx StatsD --- metrics/influxstatsd/influxstatsd.go | 389 ++++++++++++++++++++++ metrics/influxstatsd/influxstatsd_test.go | 90 +++++ 2 files changed, 479 insertions(+) create mode 100644 metrics/influxstatsd/influxstatsd.go create mode 100644 metrics/influxstatsd/influxstatsd_test.go diff --git a/metrics/influxstatsd/influxstatsd.go b/metrics/influxstatsd/influxstatsd.go new file mode 100644 index 000000000..a036345d6 --- /dev/null +++ b/metrics/influxstatsd/influxstatsd.go @@ -0,0 +1,389 @@ +// Package influxstatsd provides support for InfluxData's StatsD Telegraf plugin. It's very +// similar to StatsD, but supports arbitrary tags per-metric, which map to Go +// kit's label values. So, while label values are no-ops in StatsD, they are +// supported here. For more details, see the article at +// https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/ +// +// This package batches observations and emits them on some schedule to the +// remote server. This is useful even if you connect to your service +// over UDP. Emitting one network packet per observation can quickly overwhelm +// even the fastest internal network. +package influxstatsd + +import ( + "fmt" + "io" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/generic" + "github.com/go-kit/kit/metrics/internal/lv" + "github.com/go-kit/kit/metrics/internal/ratemap" + "github.com/go-kit/kit/util/conn" +) + +// Influxstatsd receives metrics observations and forwards them to a server. +// Create a Influxstatsd object, use it to create metrics, and pass those +// metrics as dependencies to the components that will use them. +// +// All metrics are buffered until WriteTo is called. Counters and gauges are +// aggregated into a single observation per timeseries per write. Timings and +// histograms are buffered but not aggregated. +// +// To regularly report metrics to an io.Writer, use the WriteLoop helper method. +// To send to a InfluxStatsD server, use the SendLoop helper method. +type Influxstatsd struct { + mtx sync.RWMutex + prefix string + rates *ratemap.RateMap + counters *lv.Space + gauges map[string]*gaugeNode + timings *lv.Space + histograms *lv.Space + logger log.Logger + lvs lv.LabelValues +} + +// New returns a Influxstatsd object that may be used to create metrics. Prefix is +// applied to all created metrics. Callers must ensure that regular calls to +// WriteTo are performed, either manually or with one of the helper methods. +func New(prefix string, logger log.Logger, lvs ...string) *Influxstatsd { + if len(lvs)%2 != 0 { + panic("odd number of LabelValues; programmer error!") + } + return &Influxstatsd{ + prefix: prefix, + rates: ratemap.New(), + counters: lv.NewSpace(), + gauges: map[string]*gaugeNode{}, // https://github.com/go-kit/kit/pull/588 + timings: lv.NewSpace(), + histograms: lv.NewSpace(), + logger: logger, + lvs: lvs, + } +} + +// NewCounter returns a counter, sending observations to this Influxstatsd object. +func (d *Influxstatsd) NewCounter(name string, sampleRate float64) *Counter { + d.rates.Set(name, sampleRate) + return &Counter{ + name: name, + obs: d.counters.Observe, + } +} + +// NewGauge returns a gauge, sending observations to this Influxstatsd object. +func (d *Influxstatsd) NewGauge(name string) *Gauge { + d.mtx.Lock() + n, ok := d.gauges[name] + if !ok { + n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), influx: d}} + d.gauges[name] = n + } + d.mtx.Unlock() + return n.gauge +} + +// NewTiming returns a histogram whose observations are interpreted as +// millisecond durations, and are forwarded to this Influxstatsd object. +func (d *Influxstatsd) NewTiming(name string, sampleRate float64) *Timing { + d.rates.Set(name, sampleRate) + return &Timing{ + name: name, + obs: d.timings.Observe, + } +} + +// NewHistogram returns a histogram whose observations are of an unspecified +// unit, and are forwarded to this Influxstatsd object. +func (d *Influxstatsd) NewHistogram(name string, sampleRate float64) *Histogram { + d.rates.Set(name, sampleRate) + return &Histogram{ + name: name, + obs: d.histograms.Observe, + } +} + +// WriteLoop is a helper method that invokes WriteTo to the passed writer every +// time the passed channel fires. This method blocks until the channel is +// closed, so clients probably want to run it in its own goroutine. For typical +// usage, create a time.Ticker and pass its C channel to this method. +func (d *Influxstatsd) WriteLoop(c <-chan time.Time, w io.Writer) { + for range c { + if _, err := d.WriteTo(w); err != nil { + d.logger.Log("during", "WriteTo", "err", err) + } + } +} + +// SendLoop is a helper method that wraps WriteLoop, passing a managed +// connection to the network and address. Like WriteLoop, this method blocks +// until the channel is closed, so clients probably want to start it in its own +// goroutine. For typical usage, create a time.Ticker and pass its C channel to +// this method. +func (d *Influxstatsd) SendLoop(c <-chan time.Time, network, address string) { + d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger)) +} + +// WriteTo flushes the buffered content of the metrics to the writer, in +// InfluxStatsD format. WriteTo abides best-effort semantics, so observations are +// lost if there is a problem with the write. Clients should be sure to call +// WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods. +func (d *Influxstatsd) WriteTo(w io.Writer) (count int64, err error) { + var n int + + d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + n, err = fmt.Fprintf(w, "%s%s%s:%f|c%s\n", d.prefix, name, d.tagValues(lvs), sum(values), sampling(d.rates.Get(name))) + if err != nil { + return false + } + count += int64(n) + return true + }) + if err != nil { + return count, err + } + + d.mtx.RLock() + for _, root := range d.gauges { + root.walk(func(name string, lvs lv.LabelValues, value float64) bool { + n, err = fmt.Fprintf(w, "%s%s%s:%f|g\n", d.prefix, name, d.tagValues(lvs), value) + if err != nil { + return false + } + count += int64(n) + return true + }) + } + d.mtx.RUnlock() + + d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + sampleRate := d.rates.Get(name) + for _, value := range values { + n, err = fmt.Fprintf(w, "%s%s%s:%f|ms%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate)) + if err != nil { + return false + } + count += int64(n) + } + return true + }) + if err != nil { + return count, err + } + + d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + sampleRate := d.rates.Get(name) + for _, value := range values { + n, err = fmt.Fprintf(w, "%s%s%s:%f|h%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate)) + if err != nil { + return false + } + count += int64(n) + } + return true + }) + if err != nil { + return count, err + } + + return count, err +} + +func sum(a []float64) float64 { + var v float64 + for _, f := range a { + v += f + } + return v +} + +func last(a []float64) float64 { + return a[len(a)-1] +} + +func sampling(r float64) string { + var sv string + if r < 1.0 { + sv = fmt.Sprintf("|@%f", r) + } + return sv +} + +func (d *Influxstatsd) tagValues(labelValues []string) string { + if len(labelValues) == 0 && len(d.lvs) == 0 { + return "" + } + if len(labelValues)%2 != 0 { + panic("tagValues received a labelValues with an odd number of strings") + } + pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2) + for i := 0; i < len(d.lvs); i += 2 { + pairs = append(pairs, d.lvs[i]+"="+d.lvs[i+1]) + } + for i := 0; i < len(labelValues); i += 2 { + pairs = append(pairs, labelValues[i]+"="+labelValues[i+1]) + } + return "," + strings.Join(pairs, ",") +} + +type observeFunc func(name string, lvs lv.LabelValues, value float64) + +// Counter is a InfluxStatsD counter. Observations are forwarded to a Influxstatsd +// object, and aggregated (summed) per timeseries. +type Counter struct { + name string + lvs lv.LabelValues + obs observeFunc +} + +// With implements metrics.Counter. +func (c *Counter) With(labelValues ...string) metrics.Counter { + return &Counter{ + name: c.name, + lvs: c.lvs.With(labelValues...), + obs: c.obs, + } +} + +// Add implements metrics.Counter. +func (c *Counter) Add(delta float64) { + c.obs(c.name, c.lvs, delta) +} + +// Gauge is a InfluxStatsD gauge. Observations are forwarded to a Influxstatsd +// object, and aggregated (the last observation selected) per timeseries. +type Gauge struct { + g *generic.Gauge + influx *Influxstatsd + set int32 +} + +// With implements metrics.Gauge. +func (g *Gauge) With(labelValues ...string) metrics.Gauge { + g.influx.mtx.RLock() + node := g.influx.gauges[g.g.Name] + g.influx.mtx.RUnlock() + + ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), influx: g.influx} + return node.addGauge(ga, ga.g.LabelValues()) +} + +// Set implements metrics.Gauge. +func (g *Gauge) Set(value float64) { + g.g.Set(value) + g.touch() +} + +// Add implements metrics.Gauge. +func (g *Gauge) Add(delta float64) { + g.g.Add(delta) + g.touch() +} + +// Timing is a InfluxStatsD timing, or metrics.Histogram. Observations are +// forwarded to a Influxstatsd object, and collected (but not aggregated) per +// timeseries. +type Timing struct { + name string + lvs lv.LabelValues + obs observeFunc +} + +// With implements metrics.Timing. +func (t *Timing) With(labelValues ...string) metrics.Histogram { + return &Timing{ + name: t.name, + lvs: t.lvs.With(labelValues...), + obs: t.obs, + } +} + +// Observe implements metrics.Histogram. Value is interpreted as milliseconds. +func (t *Timing) Observe(value float64) { + t.obs(t.name, t.lvs, value) +} + +// Histogram is a InfluxStatsD histrogram. Observations are forwarded to a +// Influxstatsd object, and collected (but not aggregated) per timeseries. +type Histogram struct { + name string + lvs lv.LabelValues + obs observeFunc +} + +// With implements metrics.Histogram. +func (h *Histogram) With(labelValues ...string) metrics.Histogram { + return &Histogram{ + name: h.name, + lvs: h.lvs.With(labelValues...), + obs: h.obs, + } +} + +// Observe implements metrics.Histogram. +func (h *Histogram) Observe(value float64) { + h.obs(h.name, h.lvs, value) +} + +type pair struct{ label, value string } + +type gaugeNode struct { + mtx sync.RWMutex + gauge *Gauge + children map[pair]*gaugeNode +} + +func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge { + n.mtx.Lock() + defer n.mtx.Unlock() + if len(lvs) == 0 { + if n.gauge == nil { + n.gauge = g + } + return n.gauge + } + if len(lvs) < 2 { + panic("too few LabelValues; programmer error!") + } + head, tail := pair{lvs[0], lvs[1]}, lvs[2:] + if n.children == nil { + n.children = map[pair]*gaugeNode{} + } + child, ok := n.children[head] + if !ok { + child = &gaugeNode{} + n.children[head] = child + } + return child.addGauge(g, tail) +} + +func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool { + n.mtx.RLock() + defer n.mtx.RUnlock() + if n.gauge != nil { + value, ok := n.gauge.read() + if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) { + return false + } + } + for _, child := range n.children { + if !child.walk(fn) { + return false + } + } + return true +} + +func (g *Gauge) touch() { + atomic.StoreInt32(&(g.set), 1) +} + +func (g *Gauge) read() (float64, bool) { + set := atomic.SwapInt32(&(g.set), 0) + return g.g.Value(), set != 0 +} diff --git a/metrics/influxstatsd/influxstatsd_test.go b/metrics/influxstatsd/influxstatsd_test.go new file mode 100644 index 000000000..d553cf453 --- /dev/null +++ b/metrics/influxstatsd/influxstatsd_test.go @@ -0,0 +1,90 @@ +package influxstatsd + +import ( + "testing" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics/teststat" +) + +func TestCounter(t *testing.T) { + prefix, name := "abc.", "def" + label, value := "label", "value" + regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|c$` + d := New(prefix, log.NewNopLogger()) + counter := d.NewCounter(name, 1.0).With(label, value) + valuef := teststat.SumLines(d, regex) + if err := teststat.TestCounter(counter, valuef); err != nil { + t.Fatal(err) + } +} + +func TestCounterSampled(t *testing.T) { + // This will involve multiplying the observed sum by the inverse of the + // sample rate and checking against the expected value within some + // tolerance. + t.Skip("TODO") +} + +func TestGauge(t *testing.T) { + prefix, name := "ghi.", "jkl" + label, value := "xyz", "abc" + regex := `^` + prefix + name + `,hostname=foohost,` + label + `=` + value + `:([0-9\.]+)\|g$` + d := New(prefix, log.NewNopLogger(), "hostname", "foohost") + gauge := d.NewGauge(name).With(label, value) + valuef := teststat.LastLine(d, regex) + if err := teststat.TestGauge(gauge, valuef); err != nil { + t.Fatal(err) + } +} + +// InfluxStatsD histograms just emit all observations. So, we collect them into +// a generic histogram, and run the statistics test on that. + +func TestHistogram(t *testing.T) { + prefix, name := "influxstatsd.", "histogram_test" + label, value := "abc", "def" + regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|h$` + d := New(prefix, log.NewNopLogger()) + histogram := d.NewHistogram(name, 1.0).With(label, value) + quantiles := teststat.Quantiles(d, regex, 50) // no |@0.X + if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { + t.Fatal(err) + } +} + +func TestHistogramSampled(t *testing.T) { + prefix, name := "influxstatsd.", "sampled_histogram_test" + label, value := "foo", "bar" + regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|h\|@0\.01[0]*$` + d := New(prefix, log.NewNopLogger()) + histogram := d.NewHistogram(name, 0.01).With(label, value) + quantiles := teststat.Quantiles(d, regex, 50) + if err := teststat.TestHistogram(histogram, quantiles, 0.02); err != nil { + t.Fatal(err) + } +} + +func TestTiming(t *testing.T) { + prefix, name := "influxstatsd.", "timing_test" + label, value := "wiggle", "bottom" + regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|ms$` + d := New(prefix, log.NewNopLogger()) + histogram := d.NewTiming(name, 1.0).With(label, value) + quantiles := teststat.Quantiles(d, regex, 50) // no |@0.X + if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { + t.Fatal(err) + } +} + +func TestTimingSampled(t *testing.T) { + prefix, name := "influxstatsd.", "sampled_timing_test" + label, value := "internal", "external" + regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|ms\|@0.03[0]*$` + d := New(prefix, log.NewNopLogger()) + histogram := d.NewTiming(name, 0.03).With(label, value) + quantiles := teststat.Quantiles(d, regex, 50) + if err := teststat.TestHistogram(histogram, quantiles, 0.02); err != nil { + t.Fatal(err) + } +}