From 24eddfe238bc7e5191390fe68725d7e04a8249ca Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Tue, 30 May 2017 12:06:53 -0500 Subject: [PATCH 1/3] Issue #529: Partition API requests to CloudWatch into separate concurrent batches to circumvent the 20 data per request limit that they have --- metrics/cloudwatch/cloudwatch.go | 67 +++++++++++++++++++++------ metrics/cloudwatch/cloudwatch_test.go | 36 ++++++++++++++ 2 files changed, 90 insertions(+), 13 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 728634bc1..855dee2ae 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -20,13 +20,14 @@ import ( // // To regularly report metrics to CloudWatch, use the WriteLoop helper method. type CloudWatch struct { - mtx sync.RWMutex - namespace string - svc cloudwatchiface.CloudWatchAPI - counters map[string]*counter - gauges map[string]*gauge - histograms map[string]*histogram - logger log.Logger + mtx sync.RWMutex + namespace string + numConcurrentRequests int + svc cloudwatchiface.CloudWatchAPI + counters map[string]*counter + gauges map[string]*gauge + histograms map[string]*histogram + logger log.Logger } // New returns a CloudWatch object that may be used to create metrics. @@ -35,7 +36,8 @@ type CloudWatch struct { // manually or with one of the helper methods. func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch { return &CloudWatch{ - namespace: namespace, + namespace: namespace, + numConcurrentRequests: 10, svc: svc, counters: map[string]*counter{}, gauges: map[string]*gauge{}, @@ -44,6 +46,15 @@ func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) } } +// SetConcurrency overrides the default number (10) of concurrent requests sent to CloudWatch. +// CloudWatch allows a maximum of 20 metrics to be sent per request, so when Send is called, +// we partition the metrics and concurrently call their API. This parameter sets maximum number +// of concurrent requests. +func (cw *CloudWatch) SetConcurrency(numConcurrentRequests int) *CloudWatch { + cw.numConcurrentRequests = numConcurrentRequests + return cw +} + // NewCounter returns a counter. Observations are aggregated and emitted once // per write invocation. func (cw *CloudWatch) NewCounter(name string) metrics.Counter { @@ -133,11 +144,41 @@ func (cw *CloudWatch) Send() error { } } - _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ - Namespace: aws.String(cw.namespace), - MetricData: datums, - }) - return err + var tokens = make(chan struct{}, cw.numConcurrentRequests) + var errors = make(chan error) + var n int + + for len(datums) > 0 { + var batch []*cloudwatch.MetricDatum + lim := min(len(datums), cw.numConcurrentRequests) + batch, datums = datums[:lim], datums[lim:] + n++ + go func(batch []*cloudwatch.MetricDatum) { + tokens <- struct{}{} + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.namespace), + MetricData: batch, + }) + <-tokens + errors <- err + }(batch) + } + + var firstErr error + for ; n > 0; n-- { + if err := <-errors; err != nil && firstErr != nil { + firstErr = err + } + } + + return firstErr +} + +func min(a, b int) int { + if a < b { + return a + } + return b } // counter is a CloudWatch counter metric. diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index c0cf1d0e9..fa6cca383 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/teststat" ) @@ -83,6 +84,41 @@ func TestCounter(t *testing.T) { } } +func TestCounterLowSendConcurrency(t *testing.T) { + namespace := "abc" + names := []string{"name1", "name2", "name3", "name4", "name5", "name6"} + label, value := "label", "value" + svc := newMockCloudWatch() + cw := New(namespace, svc, log.NewNopLogger()) + cw = cw.SetConcurrency(2) + + counters := make(map[string]metrics.Counter) + for _, name := range names { + counters[name] = cw.NewCounter(name).With(label, value) + } + + valuef := func(name string) func() float64 { + return func() float64 { + err := cw.Send() + if err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + return svc.valuesReceived[name] + } + } + + for _, name := range names { + if err := teststat.TestCounter(counters[name], valuef(name)); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, name, label, value); err != nil { + t.Fatal(err) + } + } +} + func TestGauge(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" From 9e2127ec6eedbf4c717094f3523e7f6e04cb7ca7 Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Mon, 5 Jun 2017 21:13:52 -0500 Subject: [PATCH 2/3] Issue #529: Extracted FillCounter() from TestCounter() to reuse counter-filling logic in cloudwatch_test.go --- metrics/teststat/teststat.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/metrics/teststat/teststat.go b/metrics/teststat/teststat.go index 5a1885a66..74019f5b2 100644 --- a/metrics/teststat/teststat.go +++ b/metrics/teststat/teststat.go @@ -14,6 +14,16 @@ import ( // TestCounter puts some deltas through the counter, and then calls the value // func to check that the counter has the correct final value. func TestCounter(counter metrics.Counter, value func() float64) error { + want := FillCounter(counter) + if have := value(); want != have { + return fmt.Errorf("want %f, have %f", want, have) + } + + return nil +} + +// FillCounter puts some deltas through the counter and returns the total value. +func FillCounter(counter metrics.Counter) float64 { a := rand.Perm(100) n := rand.Intn(len(a)) @@ -23,12 +33,7 @@ func TestCounter(counter metrics.Counter, value func() float64) error { counter.Add(f) want += f } - - if have := value(); want != have { - return fmt.Errorf("want %f, have %f", want, have) - } - - return nil + return want } // TestGauge puts some values through the gauge, and then calls the value func From 196891add006e4322415e1da081f9519e1c39881 Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Mon, 5 Jun 2017 21:17:13 -0500 Subject: [PATCH 3/3] Issue #529: replaced SetConcurrency() with new parameter in New(). Moved semaphore into the cw struct and use defer when using it. Fixed data partitioning logic and separate batch creation from goroutines launching. Improved tests. --- metrics/cloudwatch/cloudwatch.go | 47 +++++++++++++++------------ metrics/cloudwatch/cloudwatch_test.go | 46 +++++++++++++------------- 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 855dee2ae..e267e0302 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -14,6 +14,10 @@ import ( "github.com/go-kit/kit/metrics/generic" ) +const ( + maxConcurrentRequests = 20 +) + // CloudWatch receives metrics observations and forwards them to CloudWatch. // Create a CloudWatch object, use it to create metrics, and pass those metrics as // dependencies to the components that will use them. @@ -21,6 +25,7 @@ import ( // To regularly report metrics to CloudWatch, use the WriteLoop helper method. type CloudWatch struct { mtx sync.RWMutex + sem chan struct{} namespace string numConcurrentRequests int svc cloudwatchiface.CloudWatchAPI @@ -32,12 +37,19 @@ type CloudWatch struct { // New returns a CloudWatch object that may be used to create metrics. // Namespace is applied to all created metrics and maps to the CloudWatch namespace. +// NumConcurrent sets the number of simultaneous requests to Amazon. +// A good default value is 10 and the maximum is 20. // Callers must ensure that regular calls to Send are performed, either // manually or with one of the helper methods. -func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch { +func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch { + if numConcurrent > maxConcurrentRequests { + numConcurrent = maxConcurrentRequests + } + return &CloudWatch{ + sem: make(chan struct{}, numConcurrent), namespace: namespace, - numConcurrentRequests: 10, + numConcurrentRequests: numConcurrent, svc: svc, counters: map[string]*counter{}, gauges: map[string]*gauge{}, @@ -46,15 +58,6 @@ func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) } } -// SetConcurrency overrides the default number (10) of concurrent requests sent to CloudWatch. -// CloudWatch allows a maximum of 20 metrics to be sent per request, so when Send is called, -// we partition the metrics and concurrently call their API. This parameter sets maximum number -// of concurrent requests. -func (cw *CloudWatch) SetConcurrency(numConcurrentRequests int) *CloudWatch { - cw.numConcurrentRequests = numConcurrentRequests - return cw -} - // NewCounter returns a counter. Observations are aggregated and emitted once // per write invocation. func (cw *CloudWatch) NewCounter(name string) metrics.Counter { @@ -144,28 +147,30 @@ func (cw *CloudWatch) Send() error { } } - var tokens = make(chan struct{}, cw.numConcurrentRequests) - var errors = make(chan error) - var n int - + var batches [][]*cloudwatch.MetricDatum for len(datums) > 0 { var batch []*cloudwatch.MetricDatum - lim := min(len(datums), cw.numConcurrentRequests) + lim := min(len(datums), maxConcurrentRequests) batch, datums = datums[:lim], datums[lim:] - n++ + batches = append(batches, batch) + } + + var errors = make(chan error, len(batches)) + for _, batch := range batches { go func(batch []*cloudwatch.MetricDatum) { - tokens <- struct{}{} + cw.sem <- struct{}{} + defer func() { + <-cw.sem + }() _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ Namespace: aws.String(cw.namespace), MetricData: batch, }) - <-tokens errors <- err }(batch) } - var firstErr error - for ; n > 0; n-- { + for i := 0; i < cap(errors); i++ { if err := <-errors; err != nil && firstErr != nil { firstErr = err } diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index fa6cca383..c5f11db64 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -3,6 +3,7 @@ package cloudwatch import ( "errors" "fmt" + "strconv" "sync" "testing" @@ -65,7 +66,7 @@ func TestCounter(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) counter := cw.NewCounter(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -86,34 +87,33 @@ func TestCounter(t *testing.T) { func TestCounterLowSendConcurrency(t *testing.T) { namespace := "abc" - names := []string{"name1", "name2", "name3", "name4", "name5", "name6"} - label, value := "label", "value" + var names, labels, values []string + for i := 1; i <= 45; i++ { + num := strconv.Itoa(i) + names = append(names, "name"+num) + labels = append(labels, "label"+num) + values = append(values, "value"+num) + } svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) - cw = cw.SetConcurrency(2) + cw := New(namespace, svc, 2, log.NewNopLogger()) counters := make(map[string]metrics.Counter) - for _, name := range names { - counters[name] = cw.NewCounter(name).With(label, value) + var wants []float64 + for i, name := range names { + counters[name] = cw.NewCounter(name).With(labels[i], values[i]) + wants = append(wants, teststat.FillCounter(counters[name])) } - valuef := func(name string) func() float64 { - return func() float64 { - err := cw.Send() - if err != nil { - t.Fatal(err) - } - svc.mtx.RLock() - defer svc.mtx.RUnlock() - return svc.valuesReceived[name] - } + err := cw.Send() + if err != nil { + t.Fatal(err) } - for _, name := range names { - if err := teststat.TestCounter(counters[name], valuef(name)); err != nil { - t.Fatal(err) + for i, name := range names { + if svc.valuesReceived[name] != wants[i] { + t.Fatal("want %f, have %f", wants[i], svc.valuesReceived[name]) } - if err := testDimensions(svc, name, label, value); err != nil { + if err := testDimensions(svc, name, labels[i], values[i]); err != nil { t.Fatal(err) } } @@ -123,7 +123,7 @@ func TestGauge(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) gauge := cw.NewGauge(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -146,7 +146,7 @@ func TestHistogram(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) histogram := cw.NewHistogram(name, 50).With(label, value) n50 := fmt.Sprintf("%s_50", name) n90 := fmt.Sprintf("%s_90", name)