Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 115 additions & 40 deletions metrics/dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
"github.com/go-kit/kit/metrics/internal/ratemap"
"github.com/go-kit/kit/util/conn"
Expand All @@ -34,64 +37,73 @@ import (
// To regularly report metrics to an io.Writer, use the WriteLoop helper method.
// To send to a DogStatsD server, use the SendLoop helper method.
type Dogstatsd struct {
mtx sync.RWMutex
prefix string
rates *ratemap.RateMap
counters *lv.Space
gauges *lv.Space
gauges map[string]*gaugeNode
timings *lv.Space
histograms *lv.Space
logger log.Logger
lvs lv.LabelValues
}

// New returns a Dogstatsd object that may be used to create metrics. Prefix is
// applied to all created metrics. Callers must ensure that regular calls to
// WriteTo are performed, either manually or with one of the helper methods.
func New(prefix string, logger log.Logger) *Dogstatsd {
func New(prefix string, logger log.Logger, lvs ...string) *Dogstatsd {
if len(lvs)%2 != 0 {
panic("odd number of LabelValues; programmer error!")
}
return &Dogstatsd{
prefix: prefix,
rates: ratemap.New(),
counters: lv.NewSpace(),
gauges: lv.NewSpace(),
gauges: map[string]*gaugeNode{},
timings: lv.NewSpace(),
histograms: lv.NewSpace(),
logger: logger,
lvs: lvs,
}
}

// NewCounter returns a counter, sending observations to this Dogstatsd object.
func (d *Dogstatsd) NewCounter(name string, sampleRate float64) *Counter {
d.rates.Set(d.prefix+name, sampleRate)
d.rates.Set(name, sampleRate)
return &Counter{
name: d.prefix + name,
name: name,
obs: d.counters.Observe,
}
}

// NewGauge returns a gauge, sending observations to this Dogstatsd object.
func (d *Dogstatsd) NewGauge(name string) *Gauge {
return &Gauge{
name: d.prefix + name,
obs: d.gauges.Observe,
add: d.gauges.Add,
d.mtx.Lock()
n, ok := d.gauges[name]
if !ok {
n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), ddog: d}}
d.gauges[name] = n
}
d.mtx.Unlock()
return n.gauge
}

// NewTiming returns a histogram whose observations are interpreted as
// millisecond durations, and are forwarded to this Dogstatsd object.
func (d *Dogstatsd) NewTiming(name string, sampleRate float64) *Timing {
d.rates.Set(d.prefix+name, sampleRate)
d.rates.Set(name, sampleRate)
return &Timing{
name: d.prefix + name,
name: name,
obs: d.timings.Observe,
}
}

// NewHistogram returns a histogram whose observations are of an unspecified
// unit, and are forwarded to this Dogstatsd object.
func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
d.rates.Set(d.prefix+name, sampleRate)
d.rates.Set(name, sampleRate)
return &Histogram{
name: d.prefix + name,
name: name,
obs: d.histograms.Observe,
}
}
Expand Down Expand Up @@ -125,7 +137,7 @@ func (d *Dogstatsd) WriteTo(w io.Writer) (count int64, err error) {
var n int

d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
n, err = fmt.Fprintf(w, "%s:%f|c%s%s\n", name, sum(values), sampling(d.rates.Get(name)), tagValues(lvs))
n, err = fmt.Fprintf(w, "%s%s:%f|c%s%s\n", d.prefix, name, sum(values), sampling(d.rates.Get(name)), d.tagValues(lvs))
if err != nil {
return false
}
Expand All @@ -136,22 +148,23 @@ func (d *Dogstatsd) WriteTo(w io.Writer) (count int64, err error) {
return count, err
}

d.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
n, err = fmt.Fprintf(w, "%s:%f|g%s\n", name, last(values), tagValues(lvs))
if err != nil {
return false
}
count += int64(n)
return true
})
if err != nil {
return count, err
d.mtx.RLock()
for _, root := range d.gauges {
root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
n, err = fmt.Fprintf(w, "%s%s:%f|g%s\n", d.prefix, name, value, d.tagValues(lvs))
if err != nil {
return false
}
count += int64(n)
return true
})
}
d.mtx.RUnlock()

d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
sampleRate := d.rates.Get(name)
for _, value := range values {
n, err = fmt.Fprintf(w, "%s:%f|ms%s%s\n", name, value, sampling(sampleRate), tagValues(lvs))
n, err = fmt.Fprintf(w, "%s%s:%f|ms%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
if err != nil {
return false
}
Expand All @@ -166,7 +179,7 @@ func (d *Dogstatsd) WriteTo(w io.Writer) (count int64, err error) {
d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
sampleRate := d.rates.Get(name)
for _, value := range values {
n, err = fmt.Fprintf(w, "%s:%f|h%s%s\n", name, value, sampling(sampleRate), tagValues(lvs))
n, err = fmt.Fprintf(w, "%s%s:%f|h%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
if err != nil {
return false
}
Expand Down Expand Up @@ -201,14 +214,17 @@ func sampling(r float64) string {
return sv
}

func tagValues(labelValues []string) string {
func (d *Dogstatsd) tagValues(labelValues []string) string {
if len(labelValues) == 0 {
return ""
}
if len(labelValues)%2 != 0 {
panic("tagValues received a labelValues with an odd number of strings")
}
pairs := make([]string, 0, len(labelValues)/2)
pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
for i := 0; i < len(d.lvs); i += 2 {
pairs = append(pairs, d.lvs[i]+":"+d.lvs[i+1])
}
for i := 0; i < len(labelValues); i += 2 {
pairs = append(pairs, labelValues[i]+":"+labelValues[i+1])
}
Expand Down Expand Up @@ -242,30 +258,31 @@ func (c *Counter) Add(delta float64) {
// Gauge is a DogStatsD gauge. Observations are forwarded to a Dogstatsd
// object, and aggregated (the last observation selected) per timeseries.
type Gauge struct {
name string
lvs lv.LabelValues
obs observeFunc
add observeFunc
g *generic.Gauge
ddog *Dogstatsd
set int32
}

// With implements metrics.Gauge.
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
return &Gauge{
name: g.name,
lvs: g.lvs.With(labelValues...),
obs: g.obs,
add: g.add,
}
g.ddog.mtx.RLock()
node := g.ddog.gauges[g.g.Name]
g.ddog.mtx.RUnlock()

ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), ddog: g.ddog}
return node.addGauge(ga, ga.g.LabelValues())
}

// Set implements metrics.Gauge.
func (g *Gauge) Set(value float64) {
g.obs(g.name, g.lvs, value)
g.g.Set(value)
g.touch()
}

// Add implements metrics.Gauge.
func (g *Gauge) Add(delta float64) {
g.add(g.name, g.lvs, delta)
g.g.Add(delta)
g.touch()
}

// Timing is a DogStatsD timing, or metrics.Histogram. Observations are
Expand Down Expand Up @@ -312,3 +329,61 @@ func (h *Histogram) With(labelValues ...string) metrics.Histogram {
func (h *Histogram) Observe(value float64) {
h.obs(h.name, h.lvs, value)
}

type pair struct{ label, value string }

type gaugeNode struct {
mtx sync.RWMutex
gauge *Gauge
children map[pair]*gaugeNode
}

func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
n.mtx.Lock()
defer n.mtx.Unlock()
if len(lvs) == 0 {
if n.gauge == nil {
n.gauge = g
}
return n.gauge
}
if len(lvs) < 2 {
panic("too few LabelValues; programmer error!")
}
head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
if n.children == nil {
n.children = map[pair]*gaugeNode{}
}
child, ok := n.children[head]
if !ok {
child = &gaugeNode{}
n.children[head] = child
}
return child.addGauge(g, tail)
}

func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
n.mtx.RLock()
defer n.mtx.RUnlock()
if n.gauge != nil {
value, ok := n.gauge.read()
if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
return false
}
}
for _, child := range n.children {
if !child.walk(fn) {
return false
}
}
return true
}

func (g *Gauge) touch() {
atomic.StoreInt32(&(g.set), 1)
}

func (g *Gauge) read() (float64, bool) {
set := atomic.SwapInt32(&(g.set), 0)
return g.g.Value(), set != 0
}
4 changes: 2 additions & 2 deletions metrics/dogstatsd/dogstatsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestCounterSampled(t *testing.T) {
func TestGauge(t *testing.T) {
prefix, name := "ghi.", "jkl"
label, value := "xyz", "abc"
regex := `^` + prefix + name + `:([0-9\.]+)\|g\|#` + label + `:` + value + `$`
d := New(prefix, log.NewNopLogger())
regex := `^` + prefix + name + `:([0-9\.]+)\|g\|#hostname:foohost,` + label + `:` + value + `$`
d := New(prefix, log.NewNopLogger(), "hostname", "foohost")
gauge := d.NewGauge(name).With(label, value)
valuef := teststat.LastLine(d, regex)
if err := teststat.TestGauge(gauge, valuef); err != nil {
Expand Down