From 0a76b0bd8ce39d9fc14bd37e22d87e0156a81c21 Mon Sep 17 00:00:00 2001 From: David Weitzman Date: Fri, 4 Sep 2020 15:25:21 -0700 Subject: [PATCH 1/4] Implement BACKEND_TYPE=memcache as an alternative k/v store to redis MEMCACHE_HOST_PORT=host:port must be set with BACKEND_TYPE=memcache To minimize roundtrips when getting multiple keys, the memcache implementation does a GetMulti to fetch the existing rate limit usage and does increments asynchronously in background goroutines, since the memcache API doesn't offer multi-increment. Resolves https://github.com/envoyproxy/ratelimit/issues/140 Signed-off-by: David Weitzman --- Dockerfile.integration | 2 +- Makefile | 1 + docker-compose.yml | 10 + go.mod | 1 + go.sum | 2 + src/memcached/cache_impl.go | 300 +++++++++++++++ src/memcached/client.go | 14 + src/service_cmd/runner/runner.go | 32 +- src/settings/settings.go | 2 + test/integration/integration_test.go | 35 +- test/memcached/cache_impl_test.go | 525 +++++++++++++++++++++++++++ test/mocks/memcached/client.go | 78 ++++ test/mocks/mocks.go | 1 + 13 files changed, 982 insertions(+), 21 deletions(-) create mode 100644 src/memcached/cache_impl.go create mode 100644 src/memcached/client.go create mode 100644 test/memcached/cache_impl_test.go create mode 100644 test/mocks/memcached/client.go diff --git a/Dockerfile.integration b/Dockerfile.integration index efff81438..55eb04b4d 100644 --- a/Dockerfile.integration +++ b/Dockerfile.integration @@ -1,7 +1,7 @@ # Running this docker image runs the integration tests. FROM golang:1.14 -RUN apt-get update -y && apt-get install sudo stunnel4 redis -y && rm -rf /var/lib/apt/lists/* +RUN apt-get update -y && apt-get install sudo stunnel4 redis memcached -y && rm -rf /var/lib/apt/lists/* WORKDIR /workdir diff --git a/Makefile b/Makefile index b66a67a79..df9559064 100644 --- a/Makefile +++ b/Makefile @@ -79,6 +79,7 @@ tests_with_redis: bootstrap_redis_tls tests_unit redis-server --port 6382 --requirepass password123 & redis-server --port 6384 --requirepass password123 & redis-server --port 6385 --requirepass password123 & + memcached -u root --port 6386 -m 64 & go test -race -tags=integration $(MODULE)/... .PHONY: docker_tests diff --git a/docker-compose.yml b/docker-compose.yml index 51360d361..c8cab92d1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,15 @@ services: networks: - ratelimit-network + memcached: + image: memcached:alpine + expose: + - 11211 + ports: + - 11211:11211 + networks: + - ratelimit-network + # minimal container that builds the ratelimit service binary and exits. ratelimit-build: image: golang:1.14-alpine @@ -50,6 +59,7 @@ services: - REDIS_URL=redis:6379 - RUNTIME_ROOT=/data - RUNTIME_SUBDIRECTORY=ratelimit + - MEMCACHE_HOST_PORT=memcached:11211 networks: ratelimit-network: diff --git a/go.mod b/go.mod index 52db99472..63086e441 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.14 require ( github.com/alicebob/miniredis/v2 v2.11.4 + github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 // indirect github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354 // indirect github.com/coocood/freecache v1.1.0 diff --git a/go.sum b/go.sum index a077d1811..92115f270 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMw github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis/v2 v2.11.4 h1:GsuyeunTx7EllZBU3/6Ji3dhMQZDpC9rLf1luJ+6M5M= github.com/alicebob/miniredis/v2 v2.11.4/go.mod h1:VL3UDEfAH59bSa7MuHMuFToxkqyHh69s/WUbYlOAuyg= +github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0= +github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= diff --git a/src/memcached/cache_impl.go b/src/memcached/cache_impl.go new file mode 100644 index 000000000..1c47c7cd2 --- /dev/null +++ b/src/memcached/cache_impl.go @@ -0,0 +1,300 @@ +// The memcached limiter uses GetMulti() to check keys in parallel and then does +// increments asynchronously in the backend, since the memcache interface doesn't +// support multi-increment and it seems worthwhile to minimize the number of +// concurrent or sequential RPCs in the critical path. +// +// Another difference from redis is that memcache doesn't create a key implicitly by +// incrementing a missing entry. Instead, when increment fails an explicit "add" needs +// to be called. The process of increment becomes a bit of a dance since we try to +// limit the number of RPCs. First we call increment, then add if the increment +// failed, then increment again if the add failed (which could happen if there was +// a race to call "add"). + +package memcached + +import ( + "context" + "math" + "math/rand" + "strconv" + "sync" + + "github.com/coocood/freecache" + stats "github.com/lyft/gostats" + + "github.com/bradfitz/gomemcache/memcache" + + logger "github.com/sirupsen/logrus" + + pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" + + "github.com/envoyproxy/ratelimit/src/assert" + "github.com/envoyproxy/ratelimit/src/config" + "github.com/envoyproxy/ratelimit/src/limiter" + "github.com/envoyproxy/ratelimit/src/settings" +) + +type rateLimitCache interface { + // Same as in limiter.RateLimitCache + DoLimit( + ctx context.Context, + request *pb.RateLimitRequest, + limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus + + // Waits for any lingering goroutines that are incrementing memcache values. + // This is used for unit tests, since the memcache increments happen + // asynchronously in the background. + Flush() +} + +type rateLimitMemcacheImpl struct { + client Client + timeSource limiter.TimeSource + jitterRand *rand.Rand + expirationJitterMaxSeconds int64 + cacheKeyGenerator limiter.CacheKeyGenerator + localCache *freecache.Cache + wg sync.WaitGroup +} + +var _ limiter.RateLimitCache = (*rateLimitMemcacheImpl)(nil) + +func max(a uint32, b uint32) uint32 { + if a > b { + return a + } + return b +} + +func (this *rateLimitMemcacheImpl) DoLimit( + ctx context.Context, + request *pb.RateLimitRequest, + limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus { + + logger.Debugf("starting cache lookup") + + // request.HitsAddend could be 0 (default value) if not specified by the caller in the Ratelimit request. + hitsAddend := max(1, request.HitsAddend) + + // First build a list of all cache keys that we are actually going to hit. generateCacheKey() + // returns an empty string in the key if there is no limit so that we can keep the arrays + // all the same size. + assert.Assert(len(request.Descriptors) == len(limits)) + cacheKeys := make([]limiter.CacheKey, len(request.Descriptors)) + now := this.timeSource.UnixNow() + for i := 0; i < len(request.Descriptors); i++ { + cacheKeys[i] = this.cacheKeyGenerator.GenerateCacheKey(request.Domain, request.Descriptors[i], limits[i], now) + + // Increase statistics for limits hit by their respective requests. + if limits[i] != nil { + limits[i].Stats.TotalHits.Add(uint64(hitsAddend)) + } + } + + isOverLimitWithLocalCache := make([]bool, len(request.Descriptors)) + + keysToGet := make([]string, 0, len(request.Descriptors)) + + for i, cacheKey := range cacheKeys { + if cacheKey.Key == "" { + continue + } + + if this.localCache != nil { + // Get returns the value or not found error. + _, err := this.localCache.Get([]byte(cacheKey.Key)) + if err == nil { + isOverLimitWithLocalCache[i] = true + logger.Debugf("cache key is over the limit: %s", cacheKey.Key) + continue + } + } + + logger.Debugf("looking up cache key: %s", cacheKey.Key) + keysToGet = append(keysToGet, cacheKey.Key) + } + + // Now fetch the pipeline. + responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus, + len(request.Descriptors)) + + var memcacheValues map[string]*memcache.Item + var err error + + if len(keysToGet) > 0 { + memcacheValues, err = this.client.GetMulti(keysToGet) + if err != nil { + logger.Errorf("Error multi-getting memcache keys (%s): %s", keysToGet, err) + } + } + + for i, cacheKey := range cacheKeys { + if cacheKey.Key == "" { + responseDescriptorStatuses[i] = + &pb.RateLimitResponse_DescriptorStatus{ + Code: pb.RateLimitResponse_OK, + CurrentLimit: nil, + LimitRemaining: 0, + } + continue + } + + if isOverLimitWithLocalCache[i] { + responseDescriptorStatuses[i] = + &pb.RateLimitResponse_DescriptorStatus{ + Code: pb.RateLimitResponse_OVER_LIMIT, + CurrentLimit: limits[i].Limit, + LimitRemaining: 0, + } + limits[i].Stats.OverLimit.Add(uint64(hitsAddend)) + limits[i].Stats.OverLimitWithLocalCache.Add(uint64(hitsAddend)) + continue + } + + rawMemcacheValue, ok := memcacheValues[cacheKey.Key] + var limitBeforeIncrease uint32 + if ok { + decoded, err := strconv.ParseInt(string(rawMemcacheValue.Value), 10, 32) + if err != nil { + logger.Errorf("Unexpected non-numeric value in memcached: %v", rawMemcacheValue) + } else { + limitBeforeIncrease = uint32(decoded) + } + + } + + limitAfterIncrease := limitBeforeIncrease + hitsAddend + overLimitThreshold := limits[i].Limit.RequestsPerUnit + // The nearLimitThreshold is the number of requests that can be made before hitting the NearLimitRatio. + // We need to know it in both the OK and OVER_LIMIT scenarios. + nearLimitThreshold := uint32(math.Floor(float64(float32(overLimitThreshold) * config.NearLimitRatio))) + + logger.Debugf("cache key: %s current: %d", cacheKey.Key, limitAfterIncrease) + if limitAfterIncrease > overLimitThreshold { + responseDescriptorStatuses[i] = + &pb.RateLimitResponse_DescriptorStatus{ + Code: pb.RateLimitResponse_OVER_LIMIT, + CurrentLimit: limits[i].Limit, + LimitRemaining: 0, + } + + // Increase over limit statistics. Because we support += behavior for increasing the limit, we need to + // assess if the entire hitsAddend were over the limit. That is, if the limit's value before adding the + // N hits was over the limit, then all the N hits were over limit. + // Otherwise, only the difference between the current limit value and the over limit threshold + // were over limit hits. + if limitBeforeIncrease >= overLimitThreshold { + limits[i].Stats.OverLimit.Add(uint64(hitsAddend)) + } else { + limits[i].Stats.OverLimit.Add(uint64(limitAfterIncrease - overLimitThreshold)) + + // If the limit before increase was below the over limit value, then some of the hits were + // in the near limit range. + limits[i].Stats.NearLimit.Add(uint64(overLimitThreshold - max(nearLimitThreshold, limitBeforeIncrease))) + } + if this.localCache != nil { + // Set the TTL of the local_cache to be the entire duration. + // Since the cache_key gets changed once the time crosses over current time slot, the over-the-limit + // cache keys in local_cache lose effectiveness. + // For example, if we have an hour limit on all mongo connections, the cache key would be + // similar to mongo_1h, mongo_2h, etc. In the hour 1 (0h0m - 0h59m), the cache key is mongo_1h, we start + // to get ratelimited in the 50th minute, the ttl of local_cache will be set as 1 hour(0h50m-1h49m). + // In the time of 1h1m, since the cache key becomes different (mongo_2h), it won't get ratelimited. + err := this.localCache.Set([]byte(cacheKey.Key), []byte{}, int(limiter.UnitToDivider(limits[i].Limit.Unit))) + if err != nil { + logger.Errorf("Failing to set local cache key: %s", cacheKey.Key) + } + } + } else { + responseDescriptorStatuses[i] = + &pb.RateLimitResponse_DescriptorStatus{ + Code: pb.RateLimitResponse_OK, + CurrentLimit: limits[i].Limit, + LimitRemaining: overLimitThreshold - limitAfterIncrease, + } + + // The limit is OK but we additionally want to know if we are near the limit. + if limitAfterIncrease > nearLimitThreshold { + // Here we also need to assess which portion of the hitsAddend were in the near limit range. + // If all the hits were over the nearLimitThreshold, then all hits are near limit. Otherwise, + // only the difference between the current limit value and the near limit threshold were near + // limit hits. + if limitBeforeIncrease >= nearLimitThreshold { + limits[i].Stats.NearLimit.Add(uint64(hitsAddend)) + } else { + limits[i].Stats.NearLimit.Add(uint64(limitAfterIncrease - nearLimitThreshold)) + } + } + } + } + + this.wg.Add(1) + go this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend)) + + return responseDescriptorStatuses +} + +func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, isOverLimitWithLocalCache []bool, limits []*config.RateLimit, hitsAddend uint64) { + defer this.wg.Done() + for i, cacheKey := range cacheKeys { + if cacheKey.Key == "" || isOverLimitWithLocalCache[i] { + continue + } + + _, err := this.client.Increment(cacheKey.Key, hitsAddend) + if err == memcache.ErrCacheMiss { + expirationSeconds := limiter.UnitToDivider(limits[i].Limit.Unit) + if this.expirationJitterMaxSeconds > 0 { + expirationSeconds += this.jitterRand.Int63n(this.expirationJitterMaxSeconds) + } + + // Need to add instead of increment + err = this.client.Add(&memcache.Item{ + Key: cacheKey.Key, + Value: []byte(strconv.FormatUint(hitsAddend, 10)), + Expiration: int32(expirationSeconds), + }) + if err == memcache.ErrNotStored { + // There was a race condition to do this add. We should be able to increment + // now instead. + _, err := this.client.Increment(cacheKey.Key, hitsAddend) + if err != nil { + logger.Errorf("Failed to increment key %s after failing to add: %s", cacheKey.Key, err) + continue + } + } else if err != nil { + logger.Errorf("Failed to add key %s: %s", cacheKey.Key, err) + continue + } + } else if err != nil { + logger.Errorf("Failed to increment key %s: %s", cacheKey.Key, err) + continue + } + } +} + +func (this *rateLimitMemcacheImpl) Flush() { + this.wg.Wait() +} + +func NewRateLimitCacheImpl(client Client, timeSource limiter.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope) rateLimitCache { + return &rateLimitMemcacheImpl{ + client: client, + timeSource: timeSource, + cacheKeyGenerator: limiter.NewCacheKeyGenerator(), + jitterRand: jitterRand, + expirationJitterMaxSeconds: expirationJitterMaxSeconds, + localCache: localCache, + } +} + +func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource limiter.TimeSource, jitterRand *rand.Rand, localCache *freecache.Cache, scope stats.Scope) rateLimitCache { + return NewRateLimitCacheImpl( + memcache.New(s.MemcacheHostPort), + timeSource, + jitterRand, + s.ExpirationJitterMaxSeconds, + localCache, + scope, + ) +} diff --git a/src/memcached/client.go b/src/memcached/client.go new file mode 100644 index 000000000..55c0ec318 --- /dev/null +++ b/src/memcached/client.go @@ -0,0 +1,14 @@ +package memcached + +import ( + "github.com/bradfitz/gomemcache/memcache" +) + +var _ Client = (*memcache.Client)(nil) + +// Interface for memcached, used for mocking. +type Client interface { + GetMulti(keys []string) (map[string]*memcache.Item, error) + Increment(key string, delta uint64) (newValue uint64, err error) + Add(item *memcache.Item) error +} diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index ef318bde1..79425a2c9 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -15,6 +15,7 @@ import ( "github.com/envoyproxy/ratelimit/src/config" "github.com/envoyproxy/ratelimit/src/limiter" + "github.com/envoyproxy/ratelimit/src/memcached" "github.com/envoyproxy/ratelimit/src/redis" "github.com/envoyproxy/ratelimit/src/server" ratelimit "github.com/envoyproxy/ratelimit/src/service" @@ -34,6 +35,29 @@ func (runner *Runner) GetStatsStore() stats.Store { return runner.statsStore } +func createLimiter(srv server.Server, s settings.Settings, localCache *freecache.Cache) limiter.RateLimitCache { + switch s.BackendType { + case "redis", "": + return redis.NewRateLimiterCacheImplFromSettings( + s, + localCache, + srv, + limiter.NewTimeSourceImpl(), + rand.New(limiter.NewLockedSource(time.Now().Unix())), + s.ExpirationJitterMaxSeconds) + case "memcache": + return memcached.NewRateLimitCacheImplFromSettings( + s, + limiter.NewTimeSourceImpl(), + rand.New(limiter.NewLockedSource(time.Now().Unix())), + localCache, + srv.Scope()) + default: + logger.Fatalf("Invalid setting for BackendType: %s", s.BackendType) + panic("This line should not be reachable") + } +} + func (runner *Runner) Run() { s := settings.NewSettings() @@ -52,13 +76,7 @@ func (runner *Runner) Run() { service := ratelimit.NewService( srv.Runtime(), - redis.NewRateLimiterCacheImplFromSettings( - s, - localCache, - srv, - limiter.NewTimeSourceImpl(), - rand.New(limiter.NewLockedSource(time.Now().Unix())), - s.ExpirationJitterMaxSeconds), + createLimiter(srv, s, localCache), config.NewRateLimitConfigLoaderImpl(), srv.Scope().Scope("service"), s.RuntimeWatchRoot, diff --git a/src/settings/settings.go b/src/settings/settings.go index 78e902426..2381645a3 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -39,6 +39,8 @@ type Settings struct { RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"` ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"` LocalCacheSizeInBytes int `envconfig:"LOCAL_CACHE_SIZE_IN_BYTES" default:"0"` + MemcacheHostPort string `envconfig:"MEMCACHE_HOST_PORT" default:""` + BackendType string `envconfig:"BACKEND_TYPE" default:"redis"` } type Option func(*Settings) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index d56e29873..6559460e0 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -5,8 +5,8 @@ package integration_test import ( "bytes" "fmt" - "io/ioutil" "io" + "io/ioutil" "math/rand" "net/http" "os" @@ -14,8 +14,8 @@ import ( "testing" "time" - pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" pb_legacy "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2" + pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" "github.com/envoyproxy/ratelimit/src/service_cmd/runner" "github.com/envoyproxy/ratelimit/test/common" "github.com/stretchr/testify/assert" @@ -48,10 +48,10 @@ func newDescriptorStatusLegacy( // TODO: Once adding the ability of stopping the server in the runner (https://github.com/envoyproxy/ratelimit/issues/119), // stop the server at the end of each test, thus we can reuse the grpc port among these integration tests. func TestBasicConfig(t *testing.T) { - t.Run("WithoutPerSecondRedis", testBasicConfig("8083", "false", "0")) - t.Run("WithPerSecondRedis", testBasicConfig("8085", "true", "0")) - t.Run("WithoutPerSecondRedisWithLocalCache", testBasicConfig("18083", "false", "1000")) - t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000")) + t.Run("WithoutPerSecondRedis", testBasicConfig("8083", "false", "0", "")) + t.Run("WithPerSecondRedis", testBasicConfig("8085", "true", "0", "")) + t.Run("WithoutPerSecondRedisWithLocalCache", testBasicConfig("18083", "false", "1000", "")) + t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000", "")) } func TestBasicTLSConfig(t *testing.T) { @@ -73,6 +73,11 @@ func TestBasicReloadConfig(t *testing.T) { t.Run("ReloadWithoutWatchRoot", testBasicConfigReload("8097", "false", "0", "false")) } +func TestBasicConfigMemcache(t *testing.T) { + t.Run("Memcache", testBasicConfig("8098", "false", "0", "memcache")) + t.Run("MemcacheWithLocalCache", testBasicConfig("18099", "false", "1000", "memcache")) +} + func testBasicConfigAuthTLS(grpcPort, perSecond string, local_cache_size string) func(*testing.T) { os.Setenv("REDIS_PERSECOND_URL", "localhost:16382") os.Setenv("REDIS_URL", "localhost:16381") @@ -80,17 +85,18 @@ func testBasicConfigAuthTLS(grpcPort, perSecond string, local_cache_size string) os.Setenv("REDIS_TLS", "true") os.Setenv("REDIS_PERSECOND_AUTH", "password123") os.Setenv("REDIS_PERSECOND_TLS", "true") - return testBasicBaseConfig(grpcPort, perSecond, local_cache_size) + return testBasicBaseConfig(grpcPort, perSecond, local_cache_size, "") } -func testBasicConfig(grpcPort, perSecond string, local_cache_size string) func(*testing.T) { +func testBasicConfig(grpcPort, perSecond string, local_cache_size string, backend_type string) func(*testing.T) { os.Setenv("REDIS_PERSECOND_URL", "localhost:6380") os.Setenv("REDIS_URL", "localhost:6379") + os.Setenv("MEMCACHE_HOST_PORT", "localhost:6386") os.Setenv("REDIS_AUTH", "") os.Setenv("REDIS_TLS", "false") os.Setenv("REDIS_PERSECOND_AUTH", "") os.Setenv("REDIS_PERSECOND_TLS", "false") - return testBasicBaseConfig(grpcPort, perSecond, local_cache_size) + return testBasicBaseConfig(grpcPort, perSecond, local_cache_size, backend_type) } func testBasicConfigAuth(grpcPort, perSecond string, local_cache_size string) func(*testing.T) { @@ -100,7 +106,7 @@ func testBasicConfigAuth(grpcPort, perSecond string, local_cache_size string) fu os.Setenv("REDIS_AUTH", "password123") os.Setenv("REDIS_PERSECOND_TLS", "false") os.Setenv("REDIS_PERSECOND_AUTH", "password123") - return testBasicBaseConfig(grpcPort, perSecond, local_cache_size) + return testBasicBaseConfig(grpcPort, perSecond, local_cache_size, "") } func testBasicConfigWithoutWatchRoot(grpcPort, perSecond string, local_cache_size string) func(*testing.T) { @@ -111,7 +117,7 @@ func testBasicConfigWithoutWatchRoot(grpcPort, perSecond string, local_cache_siz os.Setenv("REDIS_PERSECOND_AUTH", "") os.Setenv("REDIS_PERSECOND_TLS", "false") os.Setenv("RUNTIME_WATCH_ROOT", "false") - return testBasicBaseConfig(grpcPort, perSecond, local_cache_size) + return testBasicBaseConfig(grpcPort, perSecond, local_cache_size, "") } func testBasicConfigReload(grpcPort, perSecond string, local_cache_size, runtimeWatchRoot string) func(*testing.T) { @@ -133,7 +139,7 @@ func getCacheKey(cacheKey string, enableLocalCache bool) string { return cacheKey } -func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) func(*testing.T) { +func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string, backend_type string) func(*testing.T) { return func(t *testing.T) { os.Setenv("REDIS_PERSECOND", perSecond) os.Setenv("PORT", "8082") @@ -145,6 +151,7 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu os.Setenv("REDIS_SOCKET_TYPE", "tcp") os.Setenv("LOCAL_CACHE_SIZE_IN_BYTES", local_cache_size) os.Setenv("USE_STATSD", "false") + os.Setenv("BACKEND_TYPE", backend_type) local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 @@ -369,6 +376,7 @@ func TestBasicConfigLegacy(t *testing.T) { os.Setenv("REDIS_AUTH", "") os.Setenv("REDIS_PERSECOND_TLS", "false") os.Setenv("REDIS_PERSECOND_AUTH", "") + os.Setenv("BACKEND_TYPE", "") runner := runner.NewRunner() go func() { @@ -505,6 +513,7 @@ func testConfigReload(grpcPort, perSecond string, local_cache_size string) func( os.Setenv("REDIS_SOCKET_TYPE", "tcp") os.Setenv("LOCAL_CACHE_SIZE_IN_BYTES", local_cache_size) os.Setenv("USE_STATSD", "false") + os.Setenv("BACKEND_TYPE", "") local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 @@ -595,4 +604,4 @@ func testConfigReload(grpcPort, perSecond string, local_cache_size string) func( panic(err) } } -} \ No newline at end of file +} diff --git a/test/memcached/cache_impl_test.go b/test/memcached/cache_impl_test.go new file mode 100644 index 000000000..d870f3119 --- /dev/null +++ b/test/memcached/cache_impl_test.go @@ -0,0 +1,525 @@ +// Adapted from test/redis/cache_impl_test.go, with most test cases being the same +// basic idea. TestMemcacheAdd() is unique to the memcache tests, since redis can create a new key +// simply by incrementing it but memcached cannot. In memcache new keys need to be explicitly +// added. +package memcached_test + +import ( + "math/rand" + "strconv" + "testing" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/coocood/freecache" + + pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" + "github.com/envoyproxy/ratelimit/src/config" + "github.com/envoyproxy/ratelimit/src/limiter" + "github.com/envoyproxy/ratelimit/src/memcached" + stats "github.com/lyft/gostats" + + "github.com/envoyproxy/ratelimit/test/common" + mock_limiter "github.com/envoyproxy/ratelimit/test/mocks/limiter" + mock_memcached "github.com/envoyproxy/ratelimit/test/mocks/memcached" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestMemcached(t *testing.T) { + assert := assert.New(t) + controller := gomock.NewController(t) + defer controller.Finish() + + timeSource := mock_limiter.NewMockTimeSource(controller) + client := mock_memcached.NewMockClient(controller) + statsStore := stats.NewStore(stats.NewNullSink(), false) + cache := memcached.NewRateLimitCacheImpl(client, timeSource, nil, 0, nil, statsStore) + + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key_value_1234"}).Return( + getMultiResult(map[string]int{"domain_key_value_1234": 4}), nil, + ) + client.EXPECT().Increment("domain_key_value_1234", uint64(1)).Return(uint64(5), nil) + + request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) + limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 5}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key2_value2_subkey2_subvalue2_1200"}).Return( + getMultiResult(map[string]int{"domain_key2_value2_subkey2_subvalue2_1200": 10}), nil, + ) + client.EXPECT().Increment("domain_key2_value2_subkey2_subvalue2_1200", uint64(1)).Return(uint64(11), nil) + + request = common.NewRateLimitRequest( + "domain", + [][][2]string{ + {{"key2", "value2"}}, + {{"key2", "value2"}, {"subkey2", "subvalue2"}}, + }, 1) + limits = []*config.RateLimit{ + nil, + config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_MINUTE, "key2_value2_subkey2_subvalue2", statsStore)} + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: nil, LimitRemaining: 0}, + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[1].Limit, LimitRemaining: 0}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[1].Stats.TotalHits.Value()) + assert.Equal(uint64(1), limits[1].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[1].Stats.NearLimit.Value()) + + timeSource.EXPECT().UnixNow().Return(int64(1000000)) + client.EXPECT().GetMulti([]string{ + "domain_key3_value3_997200", + "domain_key3_value3_subkey3_subvalue3_950400", + }).Return( + getMultiResult(map[string]int{ + "domain_key3_value3_997200": 10, + "domain_key3_value3_subkey3_subvalue3_950400": 12}), + nil, + ) + client.EXPECT().Increment("domain_key3_value3_997200", uint64(1)).Return(uint64(11), nil) + client.EXPECT().Increment("domain_key3_value3_subkey3_subvalue3_950400", uint64(1)).Return(uint64(13), nil) + + request = common.NewRateLimitRequest( + "domain", + [][][2]string{ + {{"key3", "value3"}}, + {{"key3", "value3"}, {"subkey3", "subvalue3"}}, + }, 1) + limits = []*config.RateLimit{ + config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_HOUR, "key3_value3", statsStore), + config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_DAY, "key3_value3_subkey3_subvalue3", statsStore)} + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}, + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[1].Limit, LimitRemaining: 0}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(1), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(1), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + cache.Flush() +} + +func testLocalCacheStats(localCacheStats stats.StatGenerator, statsStore stats.Store, sink *common.TestStatSink, + expectedHitCount int, expectedMissCount int, expectedLookUpCount int, expectedExpiredCount int, + expectedEntryCount int) func(*testing.T) { + return func(t *testing.T) { + localCacheStats.GenerateStats() + statsStore.Flush() + + // Check whether all local_cache related stats are available. + _, ok := sink.Record["averageAccessTime"] + assert.Equal(t, true, ok) + hitCount, ok := sink.Record["hitCount"] + assert.Equal(t, true, ok) + missCount, ok := sink.Record["missCount"] + assert.Equal(t, true, ok) + lookupCount, ok := sink.Record["lookupCount"] + assert.Equal(t, true, ok) + _, ok = sink.Record["overwriteCount"] + assert.Equal(t, true, ok) + _, ok = sink.Record["evacuateCount"] + assert.Equal(t, true, ok) + expiredCount, ok := sink.Record["expiredCount"] + assert.Equal(t, true, ok) + entryCount, ok := sink.Record["entryCount"] + assert.Equal(t, true, ok) + + // Check the correctness of hitCount, missCount, lookupCount, expiredCount and entryCount + assert.Equal(t, expectedHitCount, hitCount.(int)) + assert.Equal(t, expectedMissCount, missCount.(int)) + assert.Equal(t, expectedLookUpCount, lookupCount.(int)) + assert.Equal(t, expectedExpiredCount, expiredCount.(int)) + assert.Equal(t, expectedEntryCount, entryCount.(int)) + + sink.Clear() + } +} + +func TestOverLimitWithLocalCache(t *testing.T) { + assert := assert.New(t) + controller := gomock.NewController(t) + defer controller.Finish() + + timeSource := mock_limiter.NewMockTimeSource(controller) + client := mock_memcached.NewMockClient(controller) + localCache := freecache.NewCache(100) + sink := &common.TestStatSink{} + statsStore := stats.NewStore(sink, true) + cache := memcached.NewRateLimitCacheImpl(client, timeSource, nil, 0, localCache, statsStore) + localCacheStats := limiter.NewLocalCacheStats(localCache, statsStore.Scope("localcache")) + + // Test Near Limit Stats. Under Near Limit Ratio + timeSource.EXPECT().UnixNow().Return(int64(1000000)) + client.EXPECT().GetMulti([]string{"domain_key4_value4_997200"}).Return( + getMultiResult(map[string]int{"domain_key4_value4_997200": 10}), nil, + ) + client.EXPECT().Increment("domain_key4_value4_997200", uint64(1)).Return(uint64(5), nil) + + request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) + + limits := []*config.RateLimit{ + config.NewRateLimit(15, pb.RateLimitResponse_RateLimit_HOUR, "key4_value4", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 4}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimitWithLocalCache.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + // Check the local cache stats. + testLocalCacheStats(localCacheStats, statsStore, sink, 0, 1, 1, 0, 0) + + // Test Near Limit Stats. At Near Limit Ratio, still OK + timeSource.EXPECT().UnixNow().Return(int64(1000000)) + client.EXPECT().GetMulti([]string{"domain_key4_value4_997200"}).Return( + getMultiResult(map[string]int{"domain_key4_value4_997200": 12}), nil, + ) + client.EXPECT().Increment("domain_key4_value4_997200", uint64(1)).Return(uint64(13), nil) + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 2}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(2), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimitWithLocalCache.Value()) + assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) + + // Check the local cache stats. + testLocalCacheStats(localCacheStats, statsStore, sink, 0, 2, 2, 0, 0) + + // Test Over limit stats + timeSource.EXPECT().UnixNow().Return(int64(1000000)) + client.EXPECT().GetMulti([]string{"domain_key4_value4_997200"}).Return( + getMultiResult(map[string]int{"domain_key4_value4_997200": 15}), nil, + ) + client.EXPECT().Increment("domain_key4_value4_997200", uint64(1)).Return(uint64(16), nil) + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(3), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(1), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimitWithLocalCache.Value()) + assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) + + // Check the local cache stats. + testLocalCacheStats(localCacheStats, statsStore, sink, 0, 2, 3, 0, 1) + + // Test Over limit stats with local cache + timeSource.EXPECT().UnixNow().Return(int64(1000000)) + client.EXPECT().GetMulti([]string{"domain_key4_value4_997200"}).Times(0) + client.EXPECT().Increment("domain_key4_value4_997200", uint64(1)).Times(0) + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(4), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(2), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(1), limits[0].Stats.OverLimitWithLocalCache.Value()) + assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) + + // Check the local cache stats. + testLocalCacheStats(localCacheStats, statsStore, sink, 1, 3, 4, 0, 1) + + cache.Flush() +} + +func TestNearLimit(t *testing.T) { + assert := assert.New(t) + controller := gomock.NewController(t) + defer controller.Finish() + + timeSource := mock_limiter.NewMockTimeSource(controller) + client := mock_memcached.NewMockClient(controller) + statsStore := stats.NewStore(stats.NewNullSink(), false) + cache := memcached.NewRateLimitCacheImpl(client, timeSource, nil, 0, nil, statsStore) + + // Test Near Limit Stats. Under Near Limit Ratio + timeSource.EXPECT().UnixNow().Return(int64(1000000)) + client.EXPECT().GetMulti([]string{"domain_key4_value4_997200"}).Return( + getMultiResult(map[string]int{"domain_key4_value4_997200": 10}), nil, + ) + client.EXPECT().Increment("domain_key4_value4_997200", uint64(1)).Return(uint64(11), nil) + + request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) + + limits := []*config.RateLimit{ + config.NewRateLimit(15, pb.RateLimitResponse_RateLimit_HOUR, "key4_value4", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 4}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + // Test Near Limit Stats. At Near Limit Ratio, still OK + timeSource.EXPECT().UnixNow().Return(int64(1000000)) + client.EXPECT().GetMulti([]string{"domain_key4_value4_997200"}).Return( + getMultiResult(map[string]int{"domain_key4_value4_997200": 12}), nil, + ) + client.EXPECT().Increment("domain_key4_value4_997200", uint64(1)).Return(uint64(13), nil) + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 2}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(2), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) + + // Test Near Limit Stats. We went OVER_LIMIT, but the near_limit counter only increases + // when we are near limit, not after we have passed the limit. + timeSource.EXPECT().UnixNow().Return(int64(1000000)) + client.EXPECT().GetMulti([]string{"domain_key4_value4_997200"}).Return( + getMultiResult(map[string]int{"domain_key4_value4_997200": 15}), nil, + ) + client.EXPECT().Increment("domain_key4_value4_997200", uint64(1)).Return(uint64(16), nil) + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(3), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(1), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) + + // Now test hitsAddend that is greater than 1 + // All of it under limit, under near limit + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key5_value5_1234"}).Return( + getMultiResult(map[string]int{"domain_key5_value5_1234": 2}), nil, + ) + client.EXPECT().Increment("domain_key5_value5_1234", uint64(3)).Return(uint64(5), nil) + + request = common.NewRateLimitRequest("domain", [][][2]string{{{"key5", "value5"}}}, 3) + limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key5_value5", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 15}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(3), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + // All of it under limit, some over near limit + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key6_value6_1234"}).Return( + getMultiResult(map[string]int{"domain_key6_value6_1234": 5}), nil, + ) + client.EXPECT().Increment("domain_key6_value6_1234", uint64(2)).Return(uint64(7), nil) + + request = common.NewRateLimitRequest("domain", [][][2]string{{{"key6", "value6"}}}, 2) + limits = []*config.RateLimit{config.NewRateLimit(8, pb.RateLimitResponse_RateLimit_SECOND, "key6_value6", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 1}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(2), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) + + // All of it under limit, all of it over near limit + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key7_value7_1234"}).Return( + getMultiResult(map[string]int{"domain_key7_value7_1234": 16}), nil, + ) + client.EXPECT().Increment("domain_key7_value7_1234", uint64(3)).Return(uint64(19), nil) + + request = common.NewRateLimitRequest("domain", [][][2]string{{{"key7", "value7"}}}, 3) + limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key7_value7", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 1}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(3), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(3), limits[0].Stats.NearLimit.Value()) + + // Some of it over limit, all of it over near limit + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key8_value8_1234"}).Return( + getMultiResult(map[string]int{"domain_key8_value8_1234": 19}), nil, + ) + client.EXPECT().Increment("domain_key8_value8_1234", uint64(3)).Return(uint64(22), nil) + + request = common.NewRateLimitRequest("domain", [][][2]string{{{"key8", "value8"}}}, 3) + limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key8_value8", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(3), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(2), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) + + // Some of it in all three places + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key9_value9_1234"}).Return( + getMultiResult(map[string]int{"domain_key9_value9_1234": 15}), nil, + ) + client.EXPECT().Increment("domain_key9_value9_1234", uint64(7)).Return(uint64(22), nil) + + request = common.NewRateLimitRequest("domain", [][][2]string{{{"key9", "value9"}}}, 7) + limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key9_value9", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(7), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(2), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(4), limits[0].Stats.NearLimit.Value()) + + // all of it over limit + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key10_value10_1234"}).Return( + getMultiResult(map[string]int{"domain_key10_value10_1234": 27}), nil, + ) + client.EXPECT().Increment("domain_key10_value10_1234", uint64(3)).Return(uint64(30), nil) + + request = common.NewRateLimitRequest("domain", [][][2]string{{{"key10", "value10"}}}, 3) + limits = []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key10_value10", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(3), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(3), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + cache.Flush() +} + +func TestMemcacheWithJitter(t *testing.T) { + assert := assert.New(t) + controller := gomock.NewController(t) + defer controller.Finish() + + timeSource := mock_limiter.NewMockTimeSource(controller) + client := mock_memcached.NewMockClient(controller) + jitterSource := mock_limiter.NewMockJitterRandSource(controller) + statsStore := stats.NewStore(stats.NewNullSink(), false) + cache := memcached.NewRateLimitCacheImpl(client, timeSource, rand.New(jitterSource), 3600, nil, statsStore) + + timeSource.EXPECT().UnixNow().Return(int64(1234)) + jitterSource.EXPECT().Int63().Return(int64(100)) + + // Key is not found in memcache + client.EXPECT().GetMulti([]string{"domain_key_value_1234"}).Return(nil, nil) + // First increment attempt will fail + client.EXPECT().Increment("domain_key_value_1234", uint64(1)).Return( + uint64(0), memcache.ErrCacheMiss) + // Add succeeds + client.EXPECT().Add( + &memcache.Item{ + Key: "domain_key_value_1234", + Value: []byte(strconv.FormatUint(1, 10)), + // 1 second + 100 seconds of jitter + Expiration: int32(101), + }, + ).Return(nil) + + request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) + limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 9}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + cache.Flush() +} + +func TestMemcacheAdd(t *testing.T) { + assert := assert.New(t) + controller := gomock.NewController(t) + defer controller.Finish() + + timeSource := mock_limiter.NewMockTimeSource(controller) + client := mock_memcached.NewMockClient(controller) + statsStore := stats.NewStore(stats.NewNullSink(), false) + cache := memcached.NewRateLimitCacheImpl(client, timeSource, nil, 0, nil, statsStore) + + // Test a race condition with the initial add + timeSource.EXPECT().UnixNow().Return(int64(1234)) + + client.EXPECT().GetMulti([]string{"domain_key_value_1234"}).Return(nil, nil) + client.EXPECT().Increment("domain_key_value_1234", uint64(1)).Return( + uint64(0), memcache.ErrCacheMiss) + // Add fails, must have been a race condition + client.EXPECT().Add( + &memcache.Item{ + Key: "domain_key_value_1234", + Value: []byte(strconv.FormatUint(1, 10)), + Expiration: int32(1), + }, + ).Return(memcache.ErrNotStored) + // Should work the second time, since some other client added the key. + client.EXPECT().Increment("domain_key_value_1234", uint64(1)).Return( + uint64(2), nil) + + request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) + limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 9}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + // A rate limit with 1-minute window + timeSource.EXPECT().UnixNow().Return(int64(1234)) + client.EXPECT().GetMulti([]string{"domain_key2_value2_1200"}).Return(nil, nil) + client.EXPECT().Increment("domain_key2_value2_1200", uint64(1)).Return( + uint64(0), memcache.ErrCacheMiss) + client.EXPECT().Add( + &memcache.Item{ + Key: "domain_key2_value2_1200", + Value: []byte(strconv.FormatUint(1, 10)), + Expiration: int32(60), + }, + ).Return(nil) + + request = common.NewRateLimitRequest("domain", [][][2]string{{{"key2", "value2"}}}, 1) + limits = []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_MINUTE, "key2_value2", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 9}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + cache.Flush() +} + +func getMultiResult(vals map[string]int) map[string]*memcache.Item { + result := make(map[string]*memcache.Item, len(vals)) + for k, v := range vals { + result[k] = &memcache.Item{ + Value: []byte(strconv.Itoa(v)), + } + } + return result +} diff --git a/test/mocks/memcached/client.go b/test/mocks/memcached/client.go new file mode 100644 index 000000000..433105bd0 --- /dev/null +++ b/test/mocks/memcached/client.go @@ -0,0 +1,78 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/envoyproxy/ratelimit/src/memcached (interfaces: Client) + +// Package mock_memcached is a generated GoMock package. +package mock_memcached + +import ( + memcache "github.com/bradfitz/gomemcache/memcache" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockClient is a mock of Client interface +type MockClient struct { + ctrl *gomock.Controller + recorder *MockClientMockRecorder +} + +// MockClientMockRecorder is the mock recorder for MockClient +type MockClientMockRecorder struct { + mock *MockClient +} + +// NewMockClient creates a new mock instance +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder +} + +// Add mocks base method +func (m *MockClient) Add(arg0 *memcache.Item) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Add", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add +func (mr *MockClientMockRecorder) Add(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockClient)(nil).Add), arg0) +} + +// GetMulti mocks base method +func (m *MockClient) GetMulti(arg0 []string) (map[string]*memcache.Item, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMulti", arg0) + ret0, _ := ret[0].(map[string]*memcache.Item) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMulti indicates an expected call of GetMulti +func (mr *MockClientMockRecorder) GetMulti(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMulti", reflect.TypeOf((*MockClient)(nil).GetMulti), arg0) +} + +// Increment mocks base method +func (m *MockClient) Increment(arg0 string, arg1 uint64) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Increment", arg0, arg1) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Increment indicates an expected call of Increment +func (mr *MockClientMockRecorder) Increment(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Increment", reflect.TypeOf((*MockClient)(nil).Increment), arg0, arg1) +} diff --git a/test/mocks/mocks.go b/test/mocks/mocks.go index 9f8b18cec..12cf68dca 100644 --- a/test/mocks/mocks.go +++ b/test/mocks/mocks.go @@ -5,4 +5,5 @@ package mocks //go:generate go run github.com/golang/mock/mockgen -destination ./config/config.go github.com/envoyproxy/ratelimit/src/config RateLimitConfig,RateLimitConfigLoader //go:generate go run github.com/golang/mock/mockgen -destination ./redis/redis.go github.com/envoyproxy/ratelimit/src/redis Client //go:generate go run github.com/golang/mock/mockgen -destination ./limiter/limiter.go github.com/envoyproxy/ratelimit/src/limiter RateLimitCache,TimeSource,JitterRandSource +//go:generate go run github.com/golang/mock/mockgen -destination ./memcached/client.go github.com/envoyproxy/ratelimit/src/memcached Client //go:generate go run github.com/golang/mock/mockgen -destination ./rls/rls.go github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3 RateLimitServiceServer From f8f7de4444361b1f083c37c27e460209f029c689 Mon Sep 17 00:00:00 2001 From: David Weitzman Date: Fri, 18 Sep 2020 11:30:36 -0700 Subject: [PATCH 2/4] memcache: pull Flush() up to general RateLimitCache interface memcache: Add documentation to README Signed-off-by: David Weitzman --- README.md | 13 +++++++++++++ src/limiter/cache.go | 4 ++++ src/memcached/cache_impl.go | 17 ++--------------- src/redis/cache_impl.go | 3 +++ test/mocks/limiter/limiter.go | 12 ++++++++++++ 5 files changed, 34 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index e2f45e36f..a300c7897 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ - [Pipelining](#pipelining) - [One Redis Instance](#one-redis-instance) - [Two Redis Instances](#two-redis-instances) +- [Memcache](#memcache) - [Contact](#contact) @@ -484,6 +485,18 @@ To configure two Redis instances use the following environment variables: This setup will use the Redis server configured with the `_PERSECOND_` vars for per second limits, and the other Redis server for all other limits. +# Memcache + +Experimental Memcache support has been added as an alternative to Redis in v1.5. + +To configure a Memcache instance use the following environment variables instead of the Redis variables: + +1. `MEMCACHE_HOST_PORT=` +1. `BACKEND_TYPE=memcache` + +With memcache mode increments will happen asynchronously, so it's technically possible for +a client to exceed quota briefly if multiple requests happen at exactly the same time. + # Contact * [envoy-announce](https://groups.google.com/forum/#!forum/envoy-announce): Low frequency mailing diff --git a/src/limiter/cache.go b/src/limiter/cache.go index 9408126ca..932022281 100644 --- a/src/limiter/cache.go +++ b/src/limiter/cache.go @@ -35,4 +35,8 @@ type RateLimitCache interface { ctx context.Context, request *pb.RateLimitRequest, limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus + + // Waits for any unfinished asynchronous work. This may be used by unit tests, + // since the memcache cache does increments in a background gorountine. + Flush() } diff --git a/src/memcached/cache_impl.go b/src/memcached/cache_impl.go index 1c47c7cd2..801e0c623 100644 --- a/src/memcached/cache_impl.go +++ b/src/memcached/cache_impl.go @@ -34,19 +34,6 @@ import ( "github.com/envoyproxy/ratelimit/src/settings" ) -type rateLimitCache interface { - // Same as in limiter.RateLimitCache - DoLimit( - ctx context.Context, - request *pb.RateLimitRequest, - limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus - - // Waits for any lingering goroutines that are incrementing memcache values. - // This is used for unit tests, since the memcache increments happen - // asynchronously in the background. - Flush() -} - type rateLimitMemcacheImpl struct { client Client timeSource limiter.TimeSource @@ -277,7 +264,7 @@ func (this *rateLimitMemcacheImpl) Flush() { this.wg.Wait() } -func NewRateLimitCacheImpl(client Client, timeSource limiter.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope) rateLimitCache { +func NewRateLimitCacheImpl(client Client, timeSource limiter.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope) limiter.RateLimitCache { return &rateLimitMemcacheImpl{ client: client, timeSource: timeSource, @@ -288,7 +275,7 @@ func NewRateLimitCacheImpl(client Client, timeSource limiter.TimeSource, jitterR } } -func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource limiter.TimeSource, jitterRand *rand.Rand, localCache *freecache.Cache, scope stats.Scope) rateLimitCache { +func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource limiter.TimeSource, jitterRand *rand.Rand, localCache *freecache.Cache, scope stats.Scope) limiter.RateLimitCache { return NewRateLimitCacheImpl( memcache.New(s.MemcacheHostPort), timeSource, diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index 22528e49a..692ec96ef 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -210,6 +210,9 @@ func (this *rateLimitCacheImpl) DoLimit( return responseDescriptorStatuses } +// Flush() is a no-op with redis since quota reads and updates happen synchronously. +func (this *rateLimitCacheImpl) Flush() {} + func NewRateLimitCacheImpl(client Client, perSecondClient Client, timeSource limiter.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache) limiter.RateLimitCache { return &rateLimitCacheImpl{ client: client, diff --git a/test/mocks/limiter/limiter.go b/test/mocks/limiter/limiter.go index 7e9f3e5b3..573f0e81f 100644 --- a/test/mocks/limiter/limiter.go +++ b/test/mocks/limiter/limiter.go @@ -49,6 +49,18 @@ func (mr *MockRateLimitCacheMockRecorder) DoLimit(arg0, arg1, arg2 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoLimit", reflect.TypeOf((*MockRateLimitCache)(nil).DoLimit), arg0, arg1, arg2) } +// Flush mocks base method +func (m *MockRateLimitCache) Flush() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Flush") +} + +// Flush indicates an expected call of Flush +func (mr *MockRateLimitCacheMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockRateLimitCache)(nil).Flush)) +} + // MockTimeSource is a mock of TimeSource interface type MockTimeSource struct { ctrl *gomock.Controller From 7e68912bcc63c1a634681b360dc345c663589287 Mon Sep 17 00:00:00 2001 From: David Weitzman Date: Fri, 4 Dec 2020 10:18:36 -0800 Subject: [PATCH 3/4] Document memcache max key length Signed-off-by: David Weitzman --- README.md | 3 +++ src/memcached/cache_impl.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/README.md b/README.md index 23f293f2c..b545b1ff2 100644 --- a/README.md +++ b/README.md @@ -567,6 +567,9 @@ To configure a Memcache instance use the following environment variables instead With memcache mode increments will happen asynchronously, so it's technically possible for a client to exceed quota briefly if multiple requests happen at exactly the same time. +Note that Memcache has a max key length of 250 characters, so operations referencing very long +descriptors will fail. + # Contact * [envoy-announce](https://groups.google.com/forum/#!forum/envoy-announce): Low frequency mailing diff --git a/src/memcached/cache_impl.go b/src/memcached/cache_impl.go index 803487950..4519e367a 100644 --- a/src/memcached/cache_impl.go +++ b/src/memcached/cache_impl.go @@ -9,6 +9,9 @@ // limit the number of RPCs. First we call increment, then add if the increment // failed, then increment again if the add failed (which could happen if there was // a race to call "add"). +// +// Note that max memcache key length is 250 characters. Attempting to get or increment +// a longer key will return memcache.ErrMalformedKey package memcached From fcb9523e2ed79796127b072653ab3250fe585737 Mon Sep 17 00:00:00 2001 From: David Weitzman Date: Wed, 23 Dec 2020 16:06:38 -0800 Subject: [PATCH 4/4] Address some review comments for memcached support: - Add memcache error unit test - Rename wg -> waitGroup - Use explict "redis" for backend type in some tests instead of always assume it's the default Signed-off-by: David Weitzman --- src/memcached/cache_impl.go | 10 +++--- test/integration/integration_test.go | 4 +-- test/memcached/cache_impl_test.go | 46 ++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/memcached/cache_impl.go b/src/memcached/cache_impl.go index 4519e367a..52f8fae47 100644 --- a/src/memcached/cache_impl.go +++ b/src/memcached/cache_impl.go @@ -45,7 +45,7 @@ type rateLimitMemcacheImpl struct { expirationJitterMaxSeconds int64 cacheKeyGenerator limiter.CacheKeyGenerator localCache *freecache.Cache - wg sync.WaitGroup + waitGroup sync.WaitGroup nearLimitRatio float32 } @@ -106,7 +106,7 @@ func (this *rateLimitMemcacheImpl) DoLimit( keysToGet = append(keysToGet, cacheKey.Key) } - // Now fetch the pipeline. + // Now fetch from memcache. responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus, len(request.Descriptors)) @@ -223,14 +223,14 @@ func (this *rateLimitMemcacheImpl) DoLimit( } } - this.wg.Add(1) + this.waitGroup.Add(1) go this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend)) return responseDescriptorStatuses } func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, isOverLimitWithLocalCache []bool, limits []*config.RateLimit, hitsAddend uint64) { - defer this.wg.Done() + defer this.waitGroup.Done() for i, cacheKey := range cacheKeys { if cacheKey.Key == "" || isOverLimitWithLocalCache[i] { continue @@ -269,7 +269,7 @@ func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, i } func (this *rateLimitMemcacheImpl) Flush() { - this.wg.Wait() + this.waitGroup.Wait() } func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope, nearLimitRatio float32) limiter.RateLimitCache { diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 195c864d5..49c85bef5 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -48,9 +48,9 @@ func newDescriptorStatusLegacy( // stop the server at the end of each test, thus we can reuse the grpc port among these integration tests. func TestBasicConfig(t *testing.T) { t.Run("WithoutPerSecondRedis", testBasicConfig("8083", "false", "0", "")) - t.Run("WithPerSecondRedis", testBasicConfig("8085", "true", "0", "")) + t.Run("WithPerSecondRedis", testBasicConfig("8085", "true", "0", "redis")) t.Run("WithoutPerSecondRedisWithLocalCache", testBasicConfig("18083", "false", "1000", "")) - t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000", "")) + t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000", "redis")) } func TestBasicTLSConfig(t *testing.T) { diff --git a/test/memcached/cache_impl_test.go b/test/memcached/cache_impl_test.go index 65e4ef27e..fad218407 100644 --- a/test/memcached/cache_impl_test.go +++ b/test/memcached/cache_impl_test.go @@ -112,6 +112,52 @@ func TestMemcached(t *testing.T) { cache.Flush() } +func TestMemcachedGetError(t *testing.T) { + assert := assert.New(t) + controller := gomock.NewController(t) + defer controller.Finish() + + timeSource := mock_utils.NewMockTimeSource(controller) + client := mock_memcached.NewMockClient(controller) + statsStore := stats.NewStore(stats.NewNullSink(), false) + cache := memcached.NewRateLimitCacheImpl(client, timeSource, nil, 0, nil, statsStore, 0.8) + + timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3) + client.EXPECT().GetMulti([]string{"domain_key_value_1234"}).Return( + nil, memcache.ErrNoServers, + ) + client.EXPECT().Increment("domain_key_value_1234", uint64(1)).Return(uint64(5), nil) + + request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) + limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 9, DurationUntilReset: utils.CalculateReset(limits[0].Limit, timeSource)}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + // No error, but the key is missing + timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3) + client.EXPECT().GetMulti([]string{"domain_key_value1_1234"}).Return( + nil, nil, + ) + client.EXPECT().Increment("domain_key_value1_1234", uint64(1)).Return(uint64(5), nil) + + request = common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value1"}}}, 1) + limits = []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value1", statsStore)} + + assert.Equal( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 9, DurationUntilReset: utils.CalculateReset(limits[0].Limit, timeSource)}}, + cache.DoLimit(nil, request, limits)) + assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value()) + assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) + assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) + + cache.Flush() +} + func testLocalCacheStats(localCacheStats stats.StatGenerator, statsStore stats.Store, sink *common.TestStatSink, expectedHitCount int, expectedMissCount int, expectedLookUpCount int, expectedExpiredCount int, expectedEntryCount int) func(*testing.T) {