Issue #529: Partition API requests to CloudWatch into separate concur…#540
Issue #529: Partition API requests to CloudWatch into separate concur…#540peterbourgon merged 3 commits intogo-kit:masterfrom alpeb:529
Conversation
|
@cam-stitt Looking forward for your comments. |
metrics/cloudwatch/cloudwatch.go
Outdated
| MetricData: datums, | ||
| }) | ||
| return err | ||
| var tokens = make(chan struct{}, cw.numConcurrentRequests) |
There was a problem hiding this comment.
I'm not sure what the value of tokens is. I don't really see it being used.
There was a problem hiding this comment.
It limits the number of concurrent requests. When the channel gets filled, tokens <- struct{}{} will block until a go routine finishes and releases a token through <-tokens
| var errors = make(chan error) | ||
| var n int | ||
|
|
||
| for len(datums) > 0 { |
There was a problem hiding this comment.
My personal preference would be to loop in jumps of numConcurrentRequests and then slicing datums up each time. Rough example: https://play.golang.org/p/slrBko_QdI
There was a problem hiding this comment.
If you step through my code, you'll see it's pretty much equivalent to what you're proposing.
| }(batch) | ||
| } | ||
|
|
||
| var firstErr error |
There was a problem hiding this comment.
By returning only the first error, there may be some context lost as to what the overall problem was. Maybe we can create an Error type that actually contains multiple errors?
There was a problem hiding this comment.
Sure. Would you agree on limiting the number of errors, or shall we return everything?
|
Thanks a lot for this. It's a great start. I've put a few of my thoughts on there, but would love to get @peterbourgon's opinion. |
| lim := min(len(datums), cw.numConcurrentRequests) | ||
| batch, datums = datums[:lim], datums[lim:] | ||
| n++ | ||
| go func(batch []*cloudwatch.MetricDatum) { |
There was a problem hiding this comment.
I wonder if it's better to not make each PutMetricData in a goroutine, and just fire them and return the error that comes up. This means that the first request to error will stop any future requests from occurring, which might be nicer behaviour.
metrics/cloudwatch/cloudwatch.go
Outdated
| func (cw *CloudWatch) SetConcurrency(numConcurrentRequests int) *CloudWatch { | ||
| cw.numConcurrentRequests = numConcurrentRequests | ||
| return cw | ||
| } |
There was a problem hiding this comment.
This should be an option in the constructor, rather than a method on the struct. I don't see any reason to make it modifiable during runtime. (It's also racy as implemented.)
There was a problem hiding this comment.
Right, I wasn't sure how to handle optional parameters in go. Would this do?
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger, numConcurrentRequests ...int) *CloudWatch {
numConcurrentRequestsValue := 10
if len(numConcurrentRequests) > 0 {
numConcurrentRequestsValue = numConcurrentRequests[0]
}
return &CloudWatch{
namespace: namespace,
numConcurrentRequests: numConcurrentRequestsValue,
svc: svc,
counters: map[string]*counter{},
gauges: map[string]*gauge{},
histograms: map[string]*histogram{},
logger: logger,
}
}
There was a problem hiding this comment.
No, that's not the way. If I were you I'd just make it a non-optional parameter, and document a reasonable default in the function comment.
// New returns a blah blah. The client will emit at most
// numConcurrent simultaneous requests to Amazon.
// A good default value is 10.
func New(namespace string, svc CloudWatchAPI, numConcurrent int, logger log.Logger) {
...
}
metrics/cloudwatch/cloudwatch.go
Outdated
| MetricData: datums, | ||
| }) | ||
| return err | ||
| var tokens = make(chan struct{}, cw.numConcurrentRequests) |
There was a problem hiding this comment.
Similarly, this should be global per CloudWatch struct (so a member of the struct), not per-metric (scoped to NewCounter), right?
There was a problem hiding this comment.
Sorry, I don't understand what you mean. This is inside Send(), why are you referring to NewCounter?
There was a problem hiding this comment.
Sorry, my mistake—but my point stands: putting the semaphore here means there are numConcurrentRequest simultanous requests per Send invocation, so if the user invokes Send 10 times then you get 10x numConcurrentRequests going out. So in reality I think we want the semaphore to be in the struct.
| for len(datums) > 0 { | ||
| var batch []*cloudwatch.MetricDatum | ||
| lim := min(len(datums), cw.numConcurrentRequests) | ||
| batch, datums = datums[:lim], datums[lim:] |
There was a problem hiding this comment.
If I'm reading this right, you're sizing the batch as the minimum of (number of datums remaining) and (number of allowable concurrent requests)? That doesn't seem right—batch size and concurrent batches have nothing to do with each other, do they?
There was a problem hiding this comment.
You're right, I should have instead gotten the min between len(datums) and 20 (the maximum number of datums allowed per request).
| }) | ||
| <-tokens | ||
| errors <- err | ||
| }(batch) |
There was a problem hiding this comment.
Since you construct the batch within the loop scope, there's no reason to pass it as a parameter to this anonymous function.
| MetricData: batch, | ||
| }) | ||
| <-tokens | ||
| errors <- err |
There was a problem hiding this comment.
Is there a reason you decrement the semaphore before returning the error along the errors chan? If you didn't need to do that, it would be nicer to express this as
tokens <- struct{}{}
defer func() { <-tokens }()
_, err := cw...
errors <- err| if err := <-errors; err != nil && firstErr != nil { | ||
| firstErr = err | ||
| } | ||
| } |
There was a problem hiding this comment.
It's a little fragile, for a couple reasons:
- n is only loosely coupled to the number of goroutines you launched, easy to get out of sync w/ maintenance
- If you forget to do precisely as many reads from errors as goroutines you launched, you'll leak goroutines
Suggest precomputing batches, constructing errors as make(chan error, len(batches)), and only then launching the goroutines to process the batches. Finally, changing this loop to e.g.
for i := 0; i < cap(errors); i++ {
if err := <-errors; err != nil ... { ... }
}There was a problem hiding this comment.
Understood. I'll follow suit.
|
Thanks for the detailed review! I'll respond to comments inline, and then I'll submit a new patch. |
I think if you rebase you might get this fixed for you? |
…rent batches to circumvent the 20 data per request limit that they have
…er-filling logic in cloudwatch_test.go
…ved semaphore into the cw struct and use defer when using it. Fixed data partitioning logic and separate batch creation from goroutines launching. Improved tests.
|
Yep, after rebasing checks are passing 🎉 |
|
@cam-stitt I just want to get a +1 from you before merging. All good from your POV? |
|
Yeah, looks good. Thanks @alpeb for jumping on this. |
|
Thanks all! |
…rent batches to circumvent the 20 data per request limit that they have