Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
389 changes: 389 additions & 0 deletions metrics/influxstatsd/influxstatsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,389 @@
// Package influxstatsd provides support for InfluxData's StatsD Telegraf plugin. It's very
// similar to StatsD, but supports arbitrary tags per-metric, which map to Go
// kit's label values. So, while label values are no-ops in StatsD, they are
// supported here. For more details, see the article at
// https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/
//
// This package batches observations and emits them on some schedule to the
// remote server. This is useful even if you connect to your service
// over UDP. Emitting one network packet per observation can quickly overwhelm
// even the fastest internal network.
package influxstatsd

import (
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
"github.com/go-kit/kit/metrics/internal/ratemap"
"github.com/go-kit/kit/util/conn"
)

// Influxstatsd receives metrics observations and forwards them to a server.
// Create a Influxstatsd object, use it to create metrics, and pass those
// metrics as dependencies to the components that will use them.
//
// All metrics are buffered until WriteTo is called. Counters and gauges are
// aggregated into a single observation per timeseries per write. Timings and
// histograms are buffered but not aggregated.
//
// To regularly report metrics to an io.Writer, use the WriteLoop helper method.
// To send to a InfluxStatsD server, use the SendLoop helper method.
type Influxstatsd struct {
mtx sync.RWMutex
prefix string
rates *ratemap.RateMap
counters *lv.Space
gauges map[string]*gaugeNode
timings *lv.Space
histograms *lv.Space
logger log.Logger
lvs lv.LabelValues
}

// New returns a Influxstatsd object that may be used to create metrics. Prefix is
// applied to all created metrics. Callers must ensure that regular calls to
// WriteTo are performed, either manually or with one of the helper methods.
func New(prefix string, logger log.Logger, lvs ...string) *Influxstatsd {
if len(lvs)%2 != 0 {
panic("odd number of LabelValues; programmer error!")
}
return &Influxstatsd{
prefix: prefix,
rates: ratemap.New(),
counters: lv.NewSpace(),
gauges: map[string]*gaugeNode{}, // https://github.com/go-kit/kit/pull/588
timings: lv.NewSpace(),
histograms: lv.NewSpace(),
logger: logger,
lvs: lvs,
}
}

// NewCounter returns a counter, sending observations to this Influxstatsd object.
func (d *Influxstatsd) NewCounter(name string, sampleRate float64) *Counter {
d.rates.Set(name, sampleRate)
return &Counter{
name: name,
obs: d.counters.Observe,
}
}

// NewGauge returns a gauge, sending observations to this Influxstatsd object.
func (d *Influxstatsd) NewGauge(name string) *Gauge {
d.mtx.Lock()
n, ok := d.gauges[name]
if !ok {
n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), influx: d}}
d.gauges[name] = n
}
d.mtx.Unlock()
return n.gauge
}

// NewTiming returns a histogram whose observations are interpreted as
// millisecond durations, and are forwarded to this Influxstatsd object.
func (d *Influxstatsd) NewTiming(name string, sampleRate float64) *Timing {
d.rates.Set(name, sampleRate)
return &Timing{
name: name,
obs: d.timings.Observe,
}
}

// NewHistogram returns a histogram whose observations are of an unspecified
// unit, and are forwarded to this Influxstatsd object.
func (d *Influxstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
d.rates.Set(name, sampleRate)
return &Histogram{
name: name,
obs: d.histograms.Observe,
}
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (d *Influxstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (d *Influxstatsd) SendLoop(c <-chan time.Time, network, address string) {
d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
// InfluxStatsD format. WriteTo abides best-effort semantics, so observations are
// lost if there is a problem with the write. Clients should be sure to call
// WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
func (d *Influxstatsd) WriteTo(w io.Writer) (count int64, err error) {
var n int

d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
n, err = fmt.Fprintf(w, "%s%s%s:%f|c%s\n", d.prefix, name, d.tagValues(lvs), sum(values), sampling(d.rates.Get(name)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Influxstatsd vs. Dogstatsd:

n, err = fmt.Fprintf(w, "%s%s%s:%f|c%s\n", d.prefix, name, d.tagValues(lvs), sum(values), sampling(d.rates.Get(name))) // Influx
n, err = fmt.Fprintf(w, "%s%s:%f|c%s%s\n", d.prefix, name, sum(values), sampling(d.rates.Get(name)), d.tagValues(lvs)  // Dog

if err != nil {
return false
}
count += int64(n)
return true
})
if err != nil {
return count, err
}

d.mtx.RLock()
for _, root := range d.gauges {
root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
n, err = fmt.Fprintf(w, "%s%s%s:%f|g\n", d.prefix, name, d.tagValues(lvs), value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Influxstatsd vs. Dogstatsd:

n, err = fmt.Fprintf(w, "%s%s%s:%f|g\n", d.prefix, name, d.tagValues(lvs), value) // Influx
n, err = fmt.Fprintf(w, "%s%s:%f|g%s\n", d.prefix, name, value, d.tagValues(lvs)) // Dog

if err != nil {
return false
}
count += int64(n)
return true
})
}
d.mtx.RUnlock()

d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
sampleRate := d.rates.Get(name)
for _, value := range values {
n, err = fmt.Fprintf(w, "%s%s%s:%f|ms%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Influxstatsd vs. Dogstatsd:

n, err = fmt.Fprintf(w, "%s%s%s:%f|ms%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate)) // Influx
n, err = fmt.Fprintf(w, "%s%s:%f|ms%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs)) // Dog

if err != nil {
return false
}
count += int64(n)
}
return true
})
if err != nil {
return count, err
}

d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
sampleRate := d.rates.Get(name)
for _, value := range values {
n, err = fmt.Fprintf(w, "%s%s%s:%f|h%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Influxstatsd vs. Dogstatsd:

n, err = fmt.Fprintf(w, "%s%s%s:%f|h%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate)) // Influx
n, err = fmt.Fprintf(w, "%s%s:%f|h%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs)) // Dog

if err != nil {
return false
}
count += int64(n)
}
return true
})
if err != nil {
return count, err
}

return count, err
}

func sum(a []float64) float64 {
var v float64
for _, f := range a {
v += f
}
return v
}

func last(a []float64) float64 {
return a[len(a)-1]
}

func sampling(r float64) string {
var sv string
if r < 1.0 {
sv = fmt.Sprintf("|@%f", r)
}
return sv
}

func (d *Influxstatsd) tagValues(labelValues []string) string {
if len(labelValues) == 0 && len(d.lvs) == 0 {
return ""
}
if len(labelValues)%2 != 0 {
panic("tagValues received a labelValues with an odd number of strings")
}
pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
for i := 0; i < len(d.lvs); i += 2 {
pairs = append(pairs, d.lvs[i]+"="+d.lvs[i+1])
}
for i := 0; i < len(labelValues); i += 2 {
pairs = append(pairs, labelValues[i]+"="+labelValues[i+1])
}
return "," + strings.Join(pairs, ",")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Influxstatsd requires key=value pairs separated from the metric line by ,

Dogstatsd requires key:value pairs separated from the metric line by |#


type observeFunc func(name string, lvs lv.LabelValues, value float64)

// Counter is a InfluxStatsD counter. Observations are forwarded to a Influxstatsd
// object, and aggregated (summed) per timeseries.
type Counter struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
name: c.name,
lvs: c.lvs.With(labelValues...),
obs: c.obs,
}
}

// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.obs(c.name, c.lvs, delta)
}

// Gauge is a InfluxStatsD gauge. Observations are forwarded to a Influxstatsd
// object, and aggregated (the last observation selected) per timeseries.
type Gauge struct {
g *generic.Gauge
influx *Influxstatsd
set int32
}

// With implements metrics.Gauge.
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
g.influx.mtx.RLock()
node := g.influx.gauges[g.g.Name]
g.influx.mtx.RUnlock()

ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), influx: g.influx}
return node.addGauge(ga, ga.g.LabelValues())
}

// Set implements metrics.Gauge.
func (g *Gauge) Set(value float64) {
g.g.Set(value)
g.touch()
}

// Add implements metrics.Gauge.
func (g *Gauge) Add(delta float64) {
g.g.Add(delta)
g.touch()
}

// Timing is a InfluxStatsD timing, or metrics.Histogram. Observations are
// forwarded to a Influxstatsd object, and collected (but not aggregated) per
// timeseries.
type Timing struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements metrics.Timing.
func (t *Timing) With(labelValues ...string) metrics.Histogram {
return &Timing{
name: t.name,
lvs: t.lvs.With(labelValues...),
obs: t.obs,
}
}

// Observe implements metrics.Histogram. Value is interpreted as milliseconds.
func (t *Timing) Observe(value float64) {
t.obs(t.name, t.lvs, value)
}

// Histogram is a InfluxStatsD histrogram. Observations are forwarded to a
// Influxstatsd object, and collected (but not aggregated) per timeseries.
type Histogram struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements metrics.Histogram.
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
return &Histogram{
name: h.name,
lvs: h.lvs.With(labelValues...),
obs: h.obs,
}
}

// Observe implements metrics.Histogram.
func (h *Histogram) Observe(value float64) {
h.obs(h.name, h.lvs, value)
}

type pair struct{ label, value string }

type gaugeNode struct {
mtx sync.RWMutex
gauge *Gauge
children map[pair]*gaugeNode
}

func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
n.mtx.Lock()
defer n.mtx.Unlock()
if len(lvs) == 0 {
if n.gauge == nil {
n.gauge = g
}
return n.gauge
}
if len(lvs) < 2 {
panic("too few LabelValues; programmer error!")
}
head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
if n.children == nil {
n.children = map[pair]*gaugeNode{}
}
child, ok := n.children[head]
if !ok {
child = &gaugeNode{}
n.children[head] = child
}
return child.addGauge(g, tail)
}

func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
n.mtx.RLock()
defer n.mtx.RUnlock()
if n.gauge != nil {
value, ok := n.gauge.read()
if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
return false
}
}
for _, child := range n.children {
if !child.walk(fn) {
return false
}
}
return true
}

func (g *Gauge) touch() {
atomic.StoreInt32(&(g.set), 1)
}

func (g *Gauge) read() (float64, bool) {
set := atomic.SwapInt32(&(g.set), 0)
return g.g.Value(), set != 0
}
Loading