Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ A gauge for the number of goroutines currently running, exported via StatsD.

```go
import (
"context"
"net"
"os"
"runtime"
Expand All @@ -81,7 +82,7 @@ func main() {
statsd := statsd.New("foo_svc.", log.NewNopLogger())
report := time.NewTicker(5 * time.Second)
defer report.Stop()
go statsd.SendLoop(report.C, "tcp", "statsd.internal:8125")
go statsd.SendLoop(context.Background(), report.C, "tcp", "statsd.internal:8125")
goroutines := statsd.NewGauge("goroutine_count")
go exportGoroutines(goroutines)
// ...
Expand Down
18 changes: 12 additions & 6 deletions metrics/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cloudwatch

import (
"context"
"fmt"
"os"
"strconv"
"sync"
"time"

Expand All @@ -14,7 +16,6 @@ import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
"strconv"
)

const (
Expand Down Expand Up @@ -136,13 +137,18 @@ func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
}

// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// channel fires. This method blocks until ctx is canceled, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
for {
select {
case <-c:
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
case <-ctx.Done():
return
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions metrics/cloudwatch2/cloudwatch2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package cloudwatch2

import (
"context"
"math"
"sync"
"time"
Expand Down Expand Up @@ -107,13 +108,18 @@ func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
}

// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// channel fires. This method blocks until ctx is canceled, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
for {
select {
case <-c:
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
case <-ctx.Done():
return
}
}
}
Expand Down
24 changes: 15 additions & 9 deletions metrics/dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package dogstatsd

import (
"context"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -109,24 +110,29 @@ func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// time the passed channel fires. This method blocks until ctx is canceled,
// so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (d *Dogstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
func (d *Dogstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
for {
select {
case <-c:
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// until ctx is canceled, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (d *Dogstatsd) SendLoop(c <-chan time.Time, network, address string) {
d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
func (d *Dogstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
Expand Down
24 changes: 15 additions & 9 deletions metrics/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package graphite

import (
"context"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -83,24 +84,29 @@ func (g *Graphite) NewHistogram(name string, buckets int) *Histogram {
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// time the passed channel fires. This method blocks until ctx is canceled,
// so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (g *Graphite) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := g.WriteTo(w); err != nil {
g.logger.Log("during", "WriteTo", "err", err)
func (g *Graphite) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
for {
select {
case <-c:
if _, err := g.WriteTo(w); err != nil {
g.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// until ctx is canceled, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (g *Graphite) SendLoop(c <-chan time.Time, network, address string) {
g.WriteLoop(c, conn.NewDefaultManager(network, address, g.logger))
func (g *Graphite) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
g.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, g.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
Expand Down
14 changes: 10 additions & 4 deletions metrics/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package influx

import (
"context"
"time"

influxdb "github.com/influxdata/influxdb1-client/v2"
Expand Down Expand Up @@ -88,10 +89,15 @@ type BatchPointsWriter interface {
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) {
for range c {
if err := in.WriteTo(w); err != nil {
in.logger.Log("during", "WriteTo", "err", err)
func (in *Influx) WriteLoop(ctx context.Context, c <-chan time.Time, w BatchPointsWriter) {
for {
select {
case <-c:
if err := in.WriteTo(w); err != nil {
in.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}
Expand Down
24 changes: 15 additions & 9 deletions metrics/influxstatsd/influxstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package influxstatsd

import (
"context"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -109,24 +110,29 @@ func (d *Influxstatsd) NewHistogram(name string, sampleRate float64) *Histogram
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// time the passed channel fires. This method blocks until ctx is canceled,
// so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (d *Influxstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
func (d *Influxstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
for {
select {
case <-c:
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// until ctx is canceled, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (d *Influxstatsd) SendLoop(c <-chan time.Time, network, address string) {
d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
func (d *Influxstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
Expand Down
2 changes: 1 addition & 1 deletion metrics/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// case "statsd":
// s := statsd.New(...)
// t := time.NewTicker(5*time.Second)
// go s.SendLoop(t.C, "tcp", "statsd.local:8125")
// go s.SendLoop(ctx, t.C, "tcp", "statsd.local:8125")
// latency = s.NewHistogram(...)
// requests = s.NewCounter(...)
// default:
Expand Down
24 changes: 15 additions & 9 deletions metrics/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package statsd

import (
"context"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -89,24 +90,29 @@ func (s *Statsd) NewTiming(name string, sampleRate float64) *Timing {
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// time the passed channel fires. This method blocks until ctx is canceled,
// so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (s *Statsd) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := s.WriteTo(w); err != nil {
s.logger.Log("during", "WriteTo", "err", err)
func (s *Statsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
for {
select {
case <-c:
if _, err := s.WriteTo(w); err != nil {
s.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// until ctx is canceled, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (s *Statsd) SendLoop(c <-chan time.Time, network, address string) {
s.WriteLoop(c, conn.NewDefaultManager(network, address, s.logger))
func (s *Statsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
s.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, s.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
Expand Down