-
Notifications
You must be signed in to change notification settings - Fork 510
Rolling window limit support #193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
84e3323
8f517f4
b1aadd1
f2c6324
ab6f360
e516c20
4a4c9cc
b69f04d
d72f951
e5704b6
b07edff
48446cf
927b23e
f0d3c8b
5e2676b
ffff44a
33f39f9
846ba8e
e4e25cc
244e801
510abf5
307c27d
122cbf5
953958e
baf012b
1face4c
5cc166b
8ce4708
d2d32b4
bc25eb7
10404f4
3ec9cca
73df311
a400046
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| package algorithm | ||
|
|
||
| import ( | ||
| "github.com/coocood/freecache" | ||
| pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" | ||
| "github.com/envoyproxy/ratelimit/src/config" | ||
| "github.com/envoyproxy/ratelimit/src/utils" | ||
| logger "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| type WindowImpl struct { | ||
| algorithm RatelimitAlgorithm | ||
| cacheKeyGenerator utils.CacheKeyGenerator | ||
| localCache *freecache.Cache | ||
| timeSource utils.TimeSource | ||
| } | ||
|
|
||
| func (w *WindowImpl) GetResponseDescriptorStatus(key string, limit *config.RateLimit, results int64, | ||
| isOverLimitWithLocalCache bool, hitsAddend int64) *pb.RateLimitResponse_DescriptorStatus { | ||
| if key == "" { | ||
| return &pb.RateLimitResponse_DescriptorStatus{ | ||
| Code: pb.RateLimitResponse_OK, | ||
| CurrentLimit: nil, | ||
| LimitRemaining: 0, | ||
| } | ||
| } | ||
|
|
||
| if isOverLimitWithLocalCache { | ||
| PopulateStats(limit, 0, uint64(hitsAddend), uint64(hitsAddend)) | ||
|
|
||
| return &pb.RateLimitResponse_DescriptorStatus{ | ||
| Code: pb.RateLimitResponse_OVER_LIMIT, | ||
| CurrentLimit: limit.Limit, | ||
| LimitRemaining: 0, | ||
| DurationUntilReset: w.algorithm.CalculateSimpleReset(limit, w.timeSource), | ||
| } | ||
| } | ||
|
|
||
| isOverLimit, limitRemaining, durationUntilReset := w.algorithm.IsOverLimit(limit, int64(results), hitsAddend) | ||
|
|
||
| if !isOverLimit { | ||
| duration := w.algorithm.CalculateReset(isOverLimit, limit, w.timeSource) | ||
| return &pb.RateLimitResponse_DescriptorStatus{ | ||
| Code: pb.RateLimitResponse_OK, | ||
| CurrentLimit: limit.Limit, | ||
| LimitRemaining: uint32(limitRemaining), | ||
| DurationUntilReset: duration, | ||
| } | ||
| } else { | ||
| if w.localCache != nil { | ||
| durationUntilReset = utils.MaxInt(1, durationUntilReset) | ||
|
|
||
| err := w.localCache.Set([]byte(key), []byte{}, durationUntilReset) | ||
| if err != nil { | ||
| logger.Errorf("Failing to set local cache key: %s", key) | ||
| } | ||
| } | ||
| duration := w.algorithm.CalculateReset(isOverLimit, limit, w.timeSource) | ||
| return &pb.RateLimitResponse_DescriptorStatus{ | ||
| Code: pb.RateLimitResponse_OVER_LIMIT, | ||
| CurrentLimit: limit.Limit, | ||
| LimitRemaining: 0, | ||
| DurationUntilReset: duration, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (w *WindowImpl) IsOverLimitWithLocalCache(key string) bool { | ||
| if w.localCache != nil { | ||
| _, err := w.localCache.Get([]byte(key)) | ||
| if err == nil { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| func (w *WindowImpl) GenerateCacheKeys(request *pb.RateLimitRequest, | ||
| limits []*config.RateLimit, hitsAddend int64, timestamp int64) []utils.CacheKey { | ||
| return w.cacheKeyGenerator.GenerateCacheKeys(request, limits, uint32(hitsAddend), timestamp) | ||
| } | ||
|
|
||
| func (w *WindowImpl) GetExpirationSeconds() int64 { | ||
| return w.algorithm.GetExpirationSeconds() | ||
| } | ||
|
|
||
| func (w *WindowImpl) GetResultsAfterIncrease() int64 { | ||
| return w.algorithm.GetResultsAfterIncrease() | ||
| } | ||
|
|
||
| func PopulateStats(limit *config.RateLimit, nearLimit uint64, overLimit uint64, overLimitWithLocalCache uint64) { | ||
| limit.Stats.NearLimit.Add(nearLimit) | ||
| limit.Stats.OverLimit.Add(overLimit) | ||
| limit.Stats.OverLimitWithLocalCache.Add(overLimitWithLocalCache) | ||
| } | ||
|
|
||
| func NewWindow(algorithm RatelimitAlgorithm, cacheKeyPrefix string, localCache *freecache.Cache, timeSource utils.TimeSource) *WindowImpl { | ||
| return &WindowImpl{ | ||
| algorithm: algorithm, | ||
| cacheKeyGenerator: utils.NewCacheKeyGenerator(cacheKeyPrefix), | ||
| localCache: localCache, | ||
| timeSource: timeSource, | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| package algorithm | ||
|
|
||
| import ( | ||
| "math" | ||
|
|
||
| "github.com/golang/protobuf/ptypes/duration" | ||
|
|
||
| "github.com/coocood/freecache" | ||
| "github.com/envoyproxy/ratelimit/src/config" | ||
| "github.com/envoyproxy/ratelimit/src/utils" | ||
| ) | ||
|
|
||
| var _ RatelimitAlgorithm = (*FixedWindowImpl)(nil) | ||
|
|
||
| type FixedWindowImpl struct { | ||
| timeSource utils.TimeSource | ||
| cacheKeyGenerator utils.CacheKeyGenerator | ||
| localCache *freecache.Cache | ||
| nearLimitRatio float32 | ||
| } | ||
|
|
||
| func (fw *FixedWindowImpl) IsOverLimit(limit *config.RateLimit, results int64, hitsAddend int64) (bool, int64, int) { | ||
| limitAfterIncrease := results | ||
| limitBeforeIncrease := limitAfterIncrease - int64(hitsAddend) | ||
| overLimitThreshold := int64(limit.Limit.RequestsPerUnit) | ||
| nearLimitThreshold := int64(math.Floor(float64(float32(overLimitThreshold) * fw.nearLimitRatio))) | ||
|
|
||
| if limitAfterIncrease > overLimitThreshold { | ||
| if limitBeforeIncrease >= overLimitThreshold { | ||
| PopulateStats(limit, 0, uint64(hitsAddend), 0) | ||
| } else { | ||
| PopulateStats(limit, uint64(overLimitThreshold-utils.MaxInt64(nearLimitThreshold, limitBeforeIncrease)), uint64(limitAfterIncrease-overLimitThreshold), 0) | ||
| } | ||
|
|
||
| return true, 0, int(utils.UnitToDivider(limit.Limit.Unit)) | ||
| } else { | ||
| if limitAfterIncrease > nearLimitThreshold { | ||
| if limitBeforeIncrease >= nearLimitThreshold { | ||
| PopulateStats(limit, uint64(hitsAddend), 0, 0) | ||
| } else { | ||
| PopulateStats(limit, uint64(limitAfterIncrease-nearLimitThreshold), 0, 0) | ||
| } | ||
| } | ||
|
|
||
| return false, overLimitThreshold - limitAfterIncrease, int(utils.UnitToDivider(limit.Limit.Unit)) | ||
| } | ||
| } | ||
|
|
||
| func (fw *FixedWindowImpl) GetExpirationSeconds() int64 { | ||
| return 0 | ||
| } | ||
|
|
||
| func (fw *FixedWindowImpl) GetResultsAfterIncrease() int64 { | ||
| return 0 | ||
| } | ||
|
|
||
| func (fw *FixedWindowImpl) CalculateSimpleReset(limit *config.RateLimit, timeSource utils.TimeSource) *duration.Duration { | ||
| return utils.CalculateFixedReset(limit.Limit, timeSource) | ||
| } | ||
|
|
||
| func (fw *FixedWindowImpl) CalculateReset(isOverLimit bool, limit *config.RateLimit, timeSource utils.TimeSource) *duration.Duration { | ||
| return fw.CalculateSimpleReset(limit, timeSource) | ||
| } | ||
|
|
||
| func NewFixedWindowAlgorithm(timeSource utils.TimeSource, localCache *freecache.Cache, nearLimitRatio float32, cacheKeyPrefix string) *FixedWindowImpl { | ||
| return &FixedWindowImpl{ | ||
| timeSource: timeSource, | ||
| cacheKeyGenerator: utils.NewCacheKeyGenerator(cacheKeyPrefix), | ||
| localCache: localCache, | ||
| nearLimitRatio: nearLimitRatio, | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| package algorithm | ||
|
|
||
| import ( | ||
| "github.com/envoyproxy/ratelimit/src/config" | ||
| "github.com/envoyproxy/ratelimit/src/utils" | ||
| "github.com/golang/protobuf/ptypes/duration" | ||
| ) | ||
|
|
||
| type RatelimitAlgorithm interface { | ||
| CalculateSimpleReset(limit *config.RateLimit, timeSource utils.TimeSource) *duration.Duration | ||
| CalculateReset(isOverLimit bool, limit *config.RateLimit, timeSource utils.TimeSource) *duration.Duration | ||
| IsOverLimit(limit *config.RateLimit, results int64, hitsAddend int64) (bool, int64, int) | ||
| GetExpirationSeconds() int64 | ||
| GetResultsAfterIncrease() int64 | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| package algorithm | ||
|
|
||
| import ( | ||
| "math" | ||
|
|
||
| "github.com/coocood/freecache" | ||
| "github.com/envoyproxy/ratelimit/src/config" | ||
| "github.com/envoyproxy/ratelimit/src/utils" | ||
| "github.com/golang/protobuf/ptypes/duration" | ||
| ) | ||
|
|
||
| var _ RatelimitAlgorithm = (*RollingWindowImpl)(nil) | ||
|
|
||
| type RollingWindowImpl struct { | ||
| timeSource utils.TimeSource | ||
| cacheKeyGenerator utils.CacheKeyGenerator | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same for |
||
| localCache *freecache.Cache | ||
| nearLimitRatio float32 | ||
| arrivedAt int64 | ||
| tat int64 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: either expand the field names or add explanatory comments. |
||
| newTat int64 | ||
| diff int64 | ||
| } | ||
|
|
||
| func (rw *RollingWindowImpl) IsOverLimit(limit *config.RateLimit, results int64, hitsAddend int64) (bool, int64, int) { | ||
| now := rw.timeSource.UnixNanoNow() | ||
|
|
||
| // Time during computation should be in nanosecond | ||
| rw.arrivedAt = now | ||
| // Tat is set to current request timestamp if not set before | ||
| rw.tat = utils.MaxInt64(results, rw.arrivedAt) | ||
| totalLimit := int64(limit.Limit.RequestsPerUnit) | ||
| period := utils.SecondsToNanoseconds(utils.UnitToDivider(limit.Limit.Unit)) | ||
| quantity := int64(hitsAddend) | ||
|
|
||
| // GCRA computation | ||
| // Emission interval is the cost of each request | ||
| emissionInterval := period / totalLimit | ||
| // New tat define the end of the window | ||
| rw.newTat = rw.tat + emissionInterval*quantity | ||
| // We allow the request if it's inside the window | ||
| allowAt := rw.newTat - period | ||
| rw.diff = rw.arrivedAt - allowAt | ||
|
|
||
| previousAllowAt := rw.tat - period | ||
| previousLimitRemaining := int64(math.Ceil(float64((rw.arrivedAt - previousAllowAt) / emissionInterval))) | ||
| previousLimitRemaining = utils.MaxInt64(previousLimitRemaining, 0) | ||
| nearLimitWindow := int64(math.Ceil(float64(float32(limit.Limit.RequestsPerUnit) * (1.0 - rw.nearLimitRatio)))) | ||
| limitRemaining := int64(math.Ceil(float64(rw.diff / emissionInterval))) | ||
| hitNearLimit := quantity - (utils.MaxInt64(previousLimitRemaining, nearLimitWindow) - nearLimitWindow) | ||
|
|
||
| if rw.diff < 0 { | ||
| PopulateStats(limit, uint64(utils.MinInt64(previousLimitRemaining, nearLimitWindow)), uint64(quantity-previousLimitRemaining), 0) | ||
|
|
||
| return true, 0, int(utils.NanosecondsToSeconds(-rw.diff)) | ||
| } else { | ||
| if hitNearLimit > 0 { | ||
| PopulateStats(limit, uint64(hitNearLimit), 0, 0) | ||
| } | ||
|
|
||
| return false, limitRemaining, 0 | ||
| } | ||
| } | ||
|
|
||
| func (rw *RollingWindowImpl) GetExpirationSeconds() int64 { | ||
| if rw.diff < 0 { | ||
| return utils.NanosecondsToSeconds(rw.tat-rw.arrivedAt) + 1 | ||
| } | ||
| return utils.NanosecondsToSeconds(rw.newTat-rw.arrivedAt) + 1 | ||
| } | ||
|
|
||
| func (rw *RollingWindowImpl) GetResultsAfterIncrease() int64 { | ||
| if rw.diff < 0 { | ||
| return rw.tat | ||
| } | ||
| return rw.newTat | ||
| } | ||
|
|
||
| func (rw *RollingWindowImpl) CalculateSimpleReset(limit *config.RateLimit, timeSource utils.TimeSource) *duration.Duration { | ||
| secondsToReset := utils.UnitToDivider(limit.Limit.Unit) | ||
| secondsToReset -= utils.NanosecondsToSeconds(timeSource.UnixNanoNow()) % secondsToReset | ||
| return &duration.Duration{Seconds: secondsToReset} | ||
| } | ||
|
|
||
| func (rw *RollingWindowImpl) CalculateReset(isOverLimit bool, limit *config.RateLimit, timeSource utils.TimeSource) *duration.Duration { | ||
| if !isOverLimit { | ||
| return utils.NanosecondsToDuration(rw.newTat - rw.arrivedAt) | ||
| } else { | ||
| return utils.NanosecondsToDuration(int64(math.Ceil(float64(rw.tat - rw.arrivedAt)))) | ||
| } | ||
| } | ||
|
|
||
| func NewRollingWindowAlgorithm(timeSource utils.TimeSource, localCache *freecache.Cache, nearLimitRatio float32, cacheKeyPrefix string) *RollingWindowImpl { | ||
| return &RollingWindowImpl{ | ||
| timeSource: timeSource, | ||
| cacheKeyGenerator: utils.NewCacheKeyGenerator(cacheKeyPrefix), | ||
| localCache: localCache, | ||
| nearLimitRatio: nearLimitRatio, | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would suggest to add a compile time check to both window implementations, to ensure that window type implements RatelimitAlgorithm interface. It's a useful technique in Go, eg:
var _ RatelimitAlgorithm = (*FixedWindowImpl)(nil)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done