Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ module github.com/envoyproxy/ratelimit
go 1.14

require (
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/alicebob/miniredis/v2 v2.11.4
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0 // indirect
github.com/coocood/freecache v1.1.0
github.com/envoyproxy/go-control-plane v0.9.7
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/go-redis/redis v6.15.9+incompatible
github.com/golang/mock v1.4.1
github.com/golang/protobuf v1.4.2
github.com/gorilla/mux v1.7.4-0.20191121170500-49c01487a141
Expand All @@ -23,6 +25,7 @@ require (
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e // indirect
golang.org/x/text v0.3.3-0.20191122225017-cbf43d21aaeb // indirect
google.golang.org/appengine v1.4.0
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/yaml.v2 v2.3.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMwWcqkLzDAQugVEwedisr5nRJ1r+7LYnv0U=
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/alicebob/miniredis/v2 v2.11.4 h1:GsuyeunTx7EllZBU3/6Ji3dhMQZDpC9rLf1luJ+6M5M=
github.com/alicebob/miniredis/v2 v2.11.4/go.mod h1:VL3UDEfAH59bSa7MuHMuFToxkqyHh69s/WUbYlOAuyg=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
Expand All @@ -30,6 +32,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrp
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -126,6 +130,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
Expand Down
293 changes: 9 additions & 284 deletions src/memcached/cache_impl.go
Original file line number Diff line number Diff line change
@@ -1,302 +1,27 @@
// The memcached limiter uses GetMulti() to check keys in parallel and then does
// increments asynchronously in the backend, since the memcache interface doesn't
// support multi-increment and it seems worthwhile to minimize the number of
// concurrent or sequential RPCs in the critical path.
//
// Another difference from redis is that memcache doesn't create a key implicitly by
// incrementing a missing entry. Instead, when increment fails an explicit "add" needs
// to be called. The process of increment becomes a bit of a dance since we try to
// limit the number of RPCs. First we call increment, then add if the increment
// failed, then increment again if the add failed (which could happen if there was
// a race to call "add").
//
// Note that max memcache key length is 250 characters. Attempting to get or increment
// a longer key will return memcache.ErrMalformedKey

package memcached

import (
"context"
"github.com/envoyproxy/ratelimit/src/stats"
"math/rand"
"strconv"
"sync"
"time"

"github.com/coocood/freecache"
gostats "github.com/lyft/gostats"

"github.com/bradfitz/gomemcache/memcache"

logger "github.com/sirupsen/logrus"

pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3"

"github.com/envoyproxy/ratelimit/src/config"
"github.com/envoyproxy/ratelimit/src/limiter"
"github.com/envoyproxy/ratelimit/src/server"
"github.com/envoyproxy/ratelimit/src/settings"
"github.com/envoyproxy/ratelimit/src/srv"
"github.com/envoyproxy/ratelimit/src/stats"
"github.com/envoyproxy/ratelimit/src/utils"
)

type rateLimitMemcacheImpl struct {
client Client
timeSource utils.TimeSource
jitterRand *rand.Rand
expirationJitterMaxSeconds int64
localCache *freecache.Cache
waitGroup sync.WaitGroup
nearLimitRatio float32
baseRateLimiter *limiter.BaseRateLimiter
}

var AutoFlushForIntegrationTests bool = false

var _ limiter.RateLimitCache = (*rateLimitMemcacheImpl)(nil)

func (this *rateLimitMemcacheImpl) DoLimit(
ctx context.Context,
request *pb.RateLimitRequest,
limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus {

logger.Debugf("starting cache lookup")

// request.HitsAddend could be 0 (default value) if not specified by the caller in the Ratelimit request.
hitsAddend := utils.Max(1, request.HitsAddend)

// First build a list of all cache keys that we are actually going to hit.
cacheKeys := this.baseRateLimiter.GenerateCacheKeys(request, limits, hitsAddend)

isOverLimitWithLocalCache := make([]bool, len(request.Descriptors))

keysToGet := make([]string, 0, len(request.Descriptors))

for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
continue
}

// Check if key is over the limit in local cache.
if this.baseRateLimiter.IsOverLimitWithLocalCache(cacheKey.Key) {
isOverLimitWithLocalCache[i] = true
logger.Debugf("cache key is over the limit: %s", cacheKey.Key)
continue
}

logger.Debugf("looking up cache key: %s", cacheKey.Key)
keysToGet = append(keysToGet, cacheKey.Key)
}

// Now fetch from memcache.
responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus,
len(request.Descriptors))

var memcacheValues map[string]*memcache.Item
var err error

if len(keysToGet) > 0 {
memcacheValues, err = this.client.GetMulti(keysToGet)
if err != nil {
logger.Errorf("Error multi-getting memcache keys (%s): %s", keysToGet, err)
}
}

for i, cacheKey := range cacheKeys {

rawMemcacheValue, ok := memcacheValues[cacheKey.Key]
var limitBeforeIncrease uint32
if ok {
decoded, err := strconv.ParseInt(string(rawMemcacheValue.Value), 10, 32)
if err != nil {
logger.Errorf("Unexpected non-numeric value in memcached: %v", rawMemcacheValue)
} else {
limitBeforeIncrease = uint32(decoded)
}

}

limitAfterIncrease := limitBeforeIncrease + hitsAddend

limitInfo := limiter.NewRateLimitInfo(limits[i], limitBeforeIncrease, limitAfterIncrease, 0, 0)

responseDescriptorStatuses[i] = this.baseRateLimiter.GetResponseDescriptorStatus(cacheKey.Key,
limitInfo, isOverLimitWithLocalCache[i], hitsAddend)
}

this.waitGroup.Add(1)
runAsync(func() { this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend)) })
if AutoFlushForIntegrationTests {
this.Flush()
}

return responseDescriptorStatuses
}

func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, isOverLimitWithLocalCache []bool,
limits []*config.RateLimit, hitsAddend uint64) {
defer this.waitGroup.Done()
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" || isOverLimitWithLocalCache[i] {
continue
}

_, err := this.client.Increment(cacheKey.Key, hitsAddend)
if err == memcache.ErrCacheMiss {
expirationSeconds := utils.UnitToDivider(limits[i].Limit.Unit)
if this.expirationJitterMaxSeconds > 0 {
expirationSeconds += this.jitterRand.Int63n(this.expirationJitterMaxSeconds)
}

// Need to add instead of increment.
err = this.client.Add(&memcache.Item{
Key: cacheKey.Key,
Value: []byte(strconv.FormatUint(hitsAddend, 10)),
Expiration: int32(expirationSeconds),
})
if err == memcache.ErrNotStored {
// There was a race condition to do this add. We should be able to increment
// now instead.
_, err := this.client.Increment(cacheKey.Key, hitsAddend)
if err != nil {
logger.Errorf("Failed to increment key %s after failing to add: %s", cacheKey.Key, err)
continue
}
} else if err != nil {
logger.Errorf("Failed to add key %s: %s", cacheKey.Key, err)
continue
}
} else if err != nil {
logger.Errorf("Failed to increment key %s: %s", cacheKey.Key, err)
continue
}
}
}

func (this *rateLimitMemcacheImpl) Flush() {
this.waitGroup.Wait()
}

func refreshServersPeriodically(serverList memcache.ServerList, srv string, d time.Duration, finish <-chan struct{}) {
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-t.C:
err := refreshServers(serverList, srv)
if err != nil {
logger.Warn("failed to refresh memcahce hosts")
} else {
logger.Debug("refreshed memcache hosts")
}
case <-finish:
return
}
}
}

func refreshServers(serverList memcache.ServerList, srv_ string) error {
servers, err := srv.ServerStringsFromSrv(srv_)
if err != nil {
return err
}
err = serverList.SetServers(servers...)
if err != nil {
return err
}
return nil
}

func newMemcachedFromSrv(srv_ string, d time.Duration) Client {
serverList := new(memcache.ServerList)
err := refreshServers(*serverList, srv_)
if err != nil {
errorText := "Unable to fetch servers from SRV"
logger.Errorf(errorText)
panic(MemcacheError(errorText))
}

if d > 0 {
logger.Infof("refreshing memcache hosts every: %v milliseconds", d.Milliseconds())
finish := make(chan struct{})
go refreshServersPeriodically(*serverList, srv_, d, finish)
} else {
logger.Debugf("not periodically refreshing memcached hosts")
}

return memcache.NewFromSelector(serverList)
}

func newMemcacheFromSettings(s settings.Settings) Client {
if s.MemcacheSrv != "" && len(s.MemcacheHostPort) > 0 {
panic(MemcacheError("Both MEMCADHE_HOST_PORT and MEMCACHE_SRV are set"))
}
if s.MemcacheSrv != "" {
logger.Debugf("Using MEMCACHE_SRV: %v", s.MemcacheSrv)
return newMemcachedFromSrv(s.MemcacheSrv, s.MemcacheSrvRefresh)
}
logger.Debugf("Usng MEMCACHE_HOST_PORT:: %v", s.MemcacheHostPort)
client := memcache.New(s.MemcacheHostPort...)
client.MaxIdleConns = s.MemcacheMaxIdleConns
return client
}

var taskQueue = make(chan func())

func runAsync(task func()) {
select {
case taskQueue <- task:
// submitted, everything is ok

default:
go func() {
// do the given task
task()

tasksProcessedWithinOnePeriod := 0
const tickDuration = 10 * time.Second
tick := time.NewTicker(tickDuration)
defer tick.Stop()

for {
select {
case t := <-taskQueue:
t()
tasksProcessedWithinOnePeriod++
case <-tick.C:
if tasksProcessedWithinOnePeriod > 0 {
tasksProcessedWithinOnePeriod = 0
continue
}
return
}
}
}()
}
}

func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRand *rand.Rand,
expirationJitterMaxSeconds int64, localCache *freecache.Cache, statsManager stats.Manager, nearLimitRatio float32, cacheKeyPrefix string) limiter.RateLimitCache {
return &rateLimitMemcacheImpl{
client: client,
timeSource: timeSource,
jitterRand: jitterRand,
expirationJitterMaxSeconds: expirationJitterMaxSeconds,
localCache: localCache,
nearLimitRatio: nearLimitRatio,
baseRateLimiter: limiter.NewBaseRateLimit(timeSource, jitterRand, expirationJitterMaxSeconds, localCache, nearLimitRatio, cacheKeyPrefix, statsManager),
}
}
storage_factory "github.com/envoyproxy/ratelimit/src/storage/factory"
)

func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource utils.TimeSource, jitterRand *rand.Rand,
localCache *freecache.Cache, scope gostats.Scope, statsManager stats.Manager) limiter.RateLimitCache {
return NewRateLimitCacheImpl(
CollectStats(newMemcacheFromSettings(s), scope.Scope("memcache")),
func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freecache.Cache, srv server.Server, timeSource utils.TimeSource, jitterRand *rand.Rand, statsManager stats.Manager) limiter.RateLimitCache {
return NewFixedRateLimitCacheImpl(
storage_factory.NewMemcached(srv.Scope().Scope("memcache"), s.MemcacheHostPort, s.MemcacheSrv, s.MemcacheSrvRefresh, s.MemcacheMaxIdleConns),
timeSource,
jitterRand,
s.ExpirationJitterMaxSeconds,
localCache,
statsManager,
s.ExpirationJitterMaxSeconds,
s.NearLimitRatio,
s.CacheKeyPrefix,
statsManager,
)
}
21 changes: 0 additions & 21 deletions src/memcached/client.go

This file was deleted.

Loading