From 2f381ee7473fc5c028397e248af57a0fdc9e95f2 Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Mon, 6 Jan 2020 08:20:56 -0800 Subject: [PATCH 01/12] Plugin statstore into runner --- src/server/server_impl.go | 8 ++-- src/service_cmd/main.go | 2 +- src/service_cmd/runner/runner.go | 9 +++- test/integration/integration_test.go | 68 ++++++++++++++++++++++------ 4 files changed, 67 insertions(+), 20 deletions(-) diff --git a/src/server/server_impl.go b/src/server/server_impl.go index a377cb625..a0803f2a9 100644 --- a/src/server/server_impl.go +++ b/src/server/server_impl.go @@ -99,11 +99,11 @@ func (server *server) Runtime() loader.IFace { return server.runtime } -func NewServer(name string, opts ...settings.Option) Server { - return newServer(name, opts...) +func NewServer(name string, store stats.Store, opts ...settings.Option) Server { + return newServer(name, store, opts...) } -func newServer(name string, opts ...settings.Option) *server { +func newServer(name string, store stats.Store, opts ...settings.Option) *server { s := settings.NewSettings() for _, opt := range opts { @@ -119,7 +119,7 @@ func newServer(name string, opts ...settings.Option) *server { ret.debugPort = s.DebugPort // setup stats - ret.store = stats.NewDefaultStore() + ret.store = store ret.scope = ret.store.Scope(name) ret.store.AddStatGenerator(stats.NewRuntimeStats(ret.scope.Scope("go"))) diff --git a/src/service_cmd/main.go b/src/service_cmd/main.go index 6b0faba63..c9b53775a 100644 --- a/src/service_cmd/main.go +++ b/src/service_cmd/main.go @@ -3,5 +3,5 @@ package main import "github.com/lyft/ratelimit/src/service_cmd/runner" func main() { - runner.Run() + runner.Run(nil) } diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index 3c39f696e..a9587af95 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -6,6 +6,8 @@ import ( "net/http" "time" + stats "github.com/lyft/gostats" + "github.com/coocood/freecache" pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2" @@ -19,7 +21,7 @@ import ( logger "github.com/sirupsen/logrus" ) -func Run() { +func Run(store stats.Store) { s := settings.NewSettings() logLevel, err := logger.ParseLevel(s.LogLevel) @@ -28,8 +30,11 @@ func Run() { } else { logger.SetLevel(logLevel) } + if store == nil { + store = stats.NewDefaultStore() + } - srv := server.NewServer("ratelimit", settings.GrpcUnaryInterceptor(nil)) + srv := server.NewServer("ratelimit", store, settings.GrpcUnaryInterceptor(nil)) var perSecondPool redis.Pool if s.RedisPerSecond { diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index faa8e41c3..35acbce14 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + stats "github.com/lyft/gostats" + pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2" pb_legacy "github.com/lyft/ratelimit/proto/ratelimit" "github.com/lyft/ratelimit/src/service_cmd/runner" @@ -44,15 +46,15 @@ func newDescriptorStatusLegacy( func TestBasicConfig(t *testing.T) { t.Run("WithoutPerSecondRedis", testBasicConfig("8083", "false", "0")) t.Run("WithPerSecondRedis", testBasicConfig("8085", "true", "0")) - t.Run("WithoutPerSecondRedisWithLocalCache", testBasicConfig("8083", "false", "1000")) - t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("8085", "true", "1000")) + t.Run("WithoutPerSecondRedisWithLocalCache", testBasicConfig("18083", "false", "1000")) + t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000")) } func TestBasicTLSConfig(t *testing.T) { t.Run("WithoutPerSecondRedisTLS", testBasicConfigAuthTLS("8087", "false", "0")) t.Run("WithPerSecondRedisTLS", testBasicConfigAuthTLS("8089", "true", "0")) - t.Run("WithoutPerSecondRedisTLSWithLocalCache", testBasicConfigAuthTLS("8087", "false", "1000")) - t.Run("WithPerSecondRedisTLSWithLocalCache", testBasicConfigAuthTLS("8089", "true", "1000")) + t.Run("WithoutPerSecondRedisTLSWithLocalCache", testBasicConfigAuthTLS("18087", "false", "1000")) + t.Run("WithPerSecondRedisTLSWithLocalCache", testBasicConfigAuthTLS("18089", "true", "1000")) } func testBasicConfigAuthTLS(grpcPort, perSecond string, local_cache_size string) func(*testing.T) { @@ -89,13 +91,14 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu os.Setenv("RUNTIME_SUBDIRECTORY", "ratelimit") os.Setenv("REDIS_PERSECOND_SOCKET_TYPE", "tcp") os.Setenv("REDIS_SOCKET_TYPE", "tcp") - os.Setenv("LOCAL_CACHE_SIZE", local_cache_size) + os.Setenv("LOCAL_CACHE_SIZE_IN_BYTES", local_cache_size) local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 + store := stats.NewDefaultStore() go func() { - runner.Run() + runner.Run(store) }() // HACK: Wait for the server to come up. Make a hook that we can wait on. @@ -142,6 +145,15 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu if i >= 20 { status = pb.RateLimitResponse_OVER_LIMIT limitRemaining = 0 + overLimitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit", getCacheKey("key2", enable_local_cache))) + assert.Equal(i-19, int(overLimitCounter.Value())) + + overLimitWithLocalCacheCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit_with_local_cache", getCacheKey("key2", enable_local_cache))) + if enable_local_cache { + assert.Equal(i-20, int(overLimitWithLocalCacheCounter.Value())) + } else { + assert.Equal(0, int(overLimitWithLocalCacheCounter.Value())) + } } assert.Equal( @@ -170,6 +182,14 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu if i >= 10 { status = pb.RateLimitResponse_OVER_LIMIT limitRemaining2 = 0 + overLimitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit", getCacheKey("key3", enable_local_cache))) + assert.Equal(i-9, int(overLimitCounter.Value())) + overLimitWithLocalCacheCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit_with_local_cache", getCacheKey("key3", enable_local_cache))) + if enable_local_cache { + assert.Equal(i-10, int(overLimitWithLocalCacheCounter.Value())) + } else { + assert.Equal(0, int(overLimitWithLocalCacheCounter.Value())) + } } assert.Equal( @@ -181,18 +201,19 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu response) assert.NoError(err) } + store = nil } } func TestBasicConfigLegacy(t *testing.T) { - t.Run("testBasicConfigLegacy", testBasicConfigLegacy("0")) - t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("1000")) + t.Run("testBasicConfigLegacy", testBasicConfigLegacy("8086", "0")) + t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("18086", "1000")) } -func testBasicConfigLegacy(local_cache_size string) func(*testing.T) { +func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { return func(t *testing.T) { os.Setenv("PORT", "8082") - os.Setenv("GRPC_PORT", "8083") + os.Setenv("GRPC_PORT", grpcPort) os.Setenv("DEBUG_PORT", "8084") os.Setenv("RUNTIME_ROOT", "runtime/current") os.Setenv("RUNTIME_SUBDIRECTORY", "ratelimit") @@ -201,19 +222,22 @@ func testBasicConfigLegacy(local_cache_size string) func(*testing.T) { os.Setenv("REDIS_URL", "localhost:6379") os.Setenv("REDIS_TLS", "false") os.Setenv("REDIS_PERSECOND_TLS", "false") - os.Setenv("LOCAL_CACHE_SIZE", local_cache_size) + os.Setenv("LOCAL_CACHE_SIZE_IN_BYTES", local_cache_size) + local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 + store := stats.NewDefaultStore() + go func() { - runner.Run() + runner.Run(store) }() // HACK: Wait for the server to come up. Make a hook that we can wait on. time.Sleep(100 * time.Millisecond) assert := assert.New(t) - conn, err := grpc.Dial("localhost:8083", grpc.WithInsecure()) + conn, err := grpc.Dial(fmt.Sprintf("localhost:%s", grpcPort), grpc.WithInsecure()) assert.NoError(err) defer conn.Close() c := pb_legacy.NewRateLimitServiceClient(conn) @@ -253,6 +277,16 @@ func testBasicConfigLegacy(local_cache_size string) func(*testing.T) { if i >= 20 { status = pb_legacy.RateLimitResponse_OVER_LIMIT limitRemaining = 0 + + overLimitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit", getCacheKey("key2", enable_local_cache))) + assert.Equal(i-19, int(overLimitCounter.Value())) + + overLimitWithLocalCacheCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit_with_local_cache", getCacheKey("key2", enable_local_cache))) + if enable_local_cache { + assert.Equal(i-20, int(overLimitWithLocalCacheCounter.Value())) + } else { + assert.Equal(0, int(overLimitWithLocalCacheCounter.Value())) + } } assert.Equal( @@ -281,6 +315,14 @@ func testBasicConfigLegacy(local_cache_size string) func(*testing.T) { if i >= 10 { status = pb_legacy.RateLimitResponse_OVER_LIMIT limitRemaining2 = 0 + overLimitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.over_limit", getCacheKey("key3", enable_local_cache))) + assert.Equal(i-9, int(overLimitCounter.Value())) + overLimitWithLocalCacheCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.over_limit_with_local_cache", getCacheKey("key3", enable_local_cache))) + if enable_local_cache { + assert.Equal(i-10, int(overLimitWithLocalCacheCounter.Value())) + } else { + assert.Equal(0, int(overLimitWithLocalCacheCounter.Value())) + } } assert.Equal( From 2b75b2cee53a3a7f1d894a85741110b52dcb6658 Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Mon, 6 Jan 2020 10:02:00 -0800 Subject: [PATCH 02/12] update the port --- test/integration/integration_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 35acbce14..06015b3b5 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -50,12 +50,13 @@ func TestBasicConfig(t *testing.T) { t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000")) } +/* func TestBasicTLSConfig(t *testing.T) { t.Run("WithoutPerSecondRedisTLS", testBasicConfigAuthTLS("8087", "false", "0")) t.Run("WithPerSecondRedisTLS", testBasicConfigAuthTLS("8089", "true", "0")) t.Run("WithoutPerSecondRedisTLSWithLocalCache", testBasicConfigAuthTLS("18087", "false", "1000")) t.Run("WithPerSecondRedisTLSWithLocalCache", testBasicConfigAuthTLS("18089", "true", "1000")) -} +}*/ func testBasicConfigAuthTLS(grpcPort, perSecond string, local_cache_size string) func(*testing.T) { os.Setenv("REDIS_PERSECOND_URL", "localhost:16382") @@ -206,8 +207,8 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu } func TestBasicConfigLegacy(t *testing.T) { - t.Run("testBasicConfigLegacy", testBasicConfigLegacy("8086", "0")) - t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("18086", "1000")) + t.Run("testBasicConfigLegacy", testBasicConfigLegacy("8093", "0")) + t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("18093", "1000")) } func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { From 86ab803402eed10dcf59e32b011c522cb2ef670d Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Tue, 7 Jan 2020 14:03:22 -0800 Subject: [PATCH 03/12] verify totalhits stats --- test/integration/integration_test.go | 61 ++++++++++------------------ 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 06015b3b5..55ff1e551 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -50,13 +50,12 @@ func TestBasicConfig(t *testing.T) { t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000")) } -/* func TestBasicTLSConfig(t *testing.T) { t.Run("WithoutPerSecondRedisTLS", testBasicConfigAuthTLS("8087", "false", "0")) t.Run("WithPerSecondRedisTLS", testBasicConfigAuthTLS("8089", "true", "0")) t.Run("WithoutPerSecondRedisTLSWithLocalCache", testBasicConfigAuthTLS("18087", "false", "1000")) t.Run("WithPerSecondRedisTLSWithLocalCache", testBasicConfigAuthTLS("18089", "true", "1000")) -}*/ +} func testBasicConfigAuthTLS(grpcPort, perSecond string, local_cache_size string) func(*testing.T) { os.Setenv("REDIS_PERSECOND_URL", "localhost:16382") @@ -92,7 +91,7 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu os.Setenv("RUNTIME_SUBDIRECTORY", "ratelimit") os.Setenv("REDIS_PERSECOND_SOCKET_TYPE", "tcp") os.Setenv("REDIS_SOCKET_TYPE", "tcp") - os.Setenv("LOCAL_CACHE_SIZE_IN_BYTES", local_cache_size) + os.Setenv("LOCAL_CACHE_SIZE", local_cache_size) local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 @@ -131,6 +130,8 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu newDescriptorStatus(pb.RateLimitResponse_OK, 50, pb.RateLimitResponse_RateLimit_SECOND, 49)}}, response) assert.NoError(err) + key1HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.basic.%s.total_hits", getCacheKey("key1", enable_local_cache))) + assert.Equal(1, int(key1HitCounter.Value())) // Now come up with a random key, and go over limit for a minute limit which should always work. r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -146,15 +147,6 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu if i >= 20 { status = pb.RateLimitResponse_OVER_LIMIT limitRemaining = 0 - overLimitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit", getCacheKey("key2", enable_local_cache))) - assert.Equal(i-19, int(overLimitCounter.Value())) - - overLimitWithLocalCacheCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit_with_local_cache", getCacheKey("key2", enable_local_cache))) - if enable_local_cache { - assert.Equal(i-20, int(overLimitWithLocalCacheCounter.Value())) - } else { - assert.Equal(0, int(overLimitWithLocalCacheCounter.Value())) - } } assert.Equal( @@ -164,6 +156,8 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu newDescriptorStatus(status, 20, pb.RateLimitResponse_RateLimit_MINUTE, limitRemaining)}}, response) assert.NoError(err) + key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) + assert.Equal(i + 1, int(key2HitCounter.Value())) } // Limit now against 2 keys in the same domain. @@ -183,14 +177,6 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu if i >= 10 { status = pb.RateLimitResponse_OVER_LIMIT limitRemaining2 = 0 - overLimitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit", getCacheKey("key3", enable_local_cache))) - assert.Equal(i-9, int(overLimitCounter.Value())) - overLimitWithLocalCacheCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit_with_local_cache", getCacheKey("key3", enable_local_cache))) - if enable_local_cache { - assert.Equal(i-10, int(overLimitWithLocalCacheCounter.Value())) - } else { - assert.Equal(0, int(overLimitWithLocalCacheCounter.Value())) - } } assert.Equal( @@ -201,8 +187,12 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu newDescriptorStatus(status, 10, pb.RateLimitResponse_RateLimit_HOUR, limitRemaining2)}}, response) assert.NoError(err) + key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) + assert.Equal(i + 26, int(key2HitCounter.Value())) + + key3HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key3", enable_local_cache))) + assert.Equal(i + 1, int(key3HitCounter.Value())) } - store = nil } } @@ -223,7 +213,7 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { os.Setenv("REDIS_URL", "localhost:6379") os.Setenv("REDIS_TLS", "false") os.Setenv("REDIS_PERSECOND_TLS", "false") - os.Setenv("LOCAL_CACHE_SIZE_IN_BYTES", local_cache_size) + os.Setenv("LOCAL_CACHE_SIZE", local_cache_size) local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 @@ -263,6 +253,8 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { newDescriptorStatusLegacy(pb_legacy.RateLimitResponse_OK, 50, pb_legacy.RateLimit_SECOND, 49)}}, response) assert.NoError(err) + key1HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.basic_legacy.%s.total_hits", getCacheKey("key1", enable_local_cache))) + assert.Equal(1, int(key1HitCounter.Value())) // Now come up with a random key, and go over limit for a minute limit which should always work. r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -278,16 +270,6 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { if i >= 20 { status = pb_legacy.RateLimitResponse_OVER_LIMIT limitRemaining = 0 - - overLimitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit", getCacheKey("key2", enable_local_cache))) - assert.Equal(i-19, int(overLimitCounter.Value())) - - overLimitWithLocalCacheCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.over_limit_with_local_cache", getCacheKey("key2", enable_local_cache))) - if enable_local_cache { - assert.Equal(i-20, int(overLimitWithLocalCacheCounter.Value())) - } else { - assert.Equal(0, int(overLimitWithLocalCacheCounter.Value())) - } } assert.Equal( @@ -297,6 +279,8 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { newDescriptorStatusLegacy(status, 20, pb_legacy.RateLimit_MINUTE, limitRemaining)}}, response) assert.NoError(err) + key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) + assert.Equal(i + 1, int(key2HitCounter.Value())) } // Limit now against 2 keys in the same domain. @@ -316,14 +300,6 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { if i >= 10 { status = pb_legacy.RateLimitResponse_OVER_LIMIT limitRemaining2 = 0 - overLimitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.over_limit", getCacheKey("key3", enable_local_cache))) - assert.Equal(i-9, int(overLimitCounter.Value())) - overLimitWithLocalCacheCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.over_limit_with_local_cache", getCacheKey("key3", enable_local_cache))) - if enable_local_cache { - assert.Equal(i-10, int(overLimitWithLocalCacheCounter.Value())) - } else { - assert.Equal(0, int(overLimitWithLocalCacheCounter.Value())) - } } assert.Equal( @@ -334,6 +310,11 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { newDescriptorStatusLegacy(status, 10, pb_legacy.RateLimit_HOUR, limitRemaining2)}}, response) assert.NoError(err) + key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.total_hits", getCacheKey("key2", enable_local_cache))) + assert.Equal(i + 1, int(key2HitCounter.Value())) + + key3HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.total_hits", getCacheKey("key3", enable_local_cache))) + assert.Equal(i + 1, int(key3HitCounter.Value())) } } } From d3dc1b6c2c00a58b31250ee04f7aec452d1e8169 Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Tue, 7 Jan 2020 14:07:36 -0800 Subject: [PATCH 04/12] add comments --- test/integration/integration_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 55ff1e551..37146cd3f 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -130,6 +130,8 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu newDescriptorStatus(pb.RateLimitResponse_OK, 50, pb.RateLimitResponse_RateLimit_SECOND, 49)}}, response) assert.NoError(err) + + // store.NewCounter returns the existing counter. key1HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.basic.%s.total_hits", getCacheKey("key1", enable_local_cache))) assert.Equal(1, int(key1HitCounter.Value())) From 61b695d6c5455222959bb04ac4397c78b574bc62 Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Tue, 7 Jan 2020 14:11:57 -0800 Subject: [PATCH 05/12] fix format --- test/integration/integration_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 37146cd3f..417493273 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -159,7 +159,7 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu response) assert.NoError(err) key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) - assert.Equal(i + 1, int(key2HitCounter.Value())) + assert.Equal(i+1, int(key2HitCounter.Value())) } // Limit now against 2 keys in the same domain. @@ -190,10 +190,10 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu response) assert.NoError(err) key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) - assert.Equal(i + 26, int(key2HitCounter.Value())) + assert.Equal(i+26, int(key2HitCounter.Value())) key3HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key3", enable_local_cache))) - assert.Equal(i + 1, int(key3HitCounter.Value())) + assert.Equal(i+1, int(key3HitCounter.Value())) } } } @@ -282,7 +282,7 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { response) assert.NoError(err) key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) - assert.Equal(i + 1, int(key2HitCounter.Value())) + assert.Equal(i+1, int(key2HitCounter.Value())) } // Limit now against 2 keys in the same domain. @@ -313,10 +313,10 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { response) assert.NoError(err) key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.total_hits", getCacheKey("key2", enable_local_cache))) - assert.Equal(i + 1, int(key2HitCounter.Value())) + assert.Equal(i+1, int(key2HitCounter.Value())) key3HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.total_hits", getCacheKey("key3", enable_local_cache))) - assert.Equal(i + 1, int(key3HitCounter.Value())) + assert.Equal(i+1, int(key3HitCounter.Value())) } } } From 8bc2b0a642a2bea5e0d52458eb867875053fd5ce Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Tue, 7 Jan 2020 14:20:06 -0800 Subject: [PATCH 06/12] change to sleep 1s --- test/integration/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 417493273..aef62f65d 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -227,7 +227,7 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { }() // HACK: Wait for the server to come up. Make a hook that we can wait on. - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Second) assert := assert.New(t) conn, err := grpc.Dial(fmt.Sprintf("localhost:%s", grpcPort), grpc.WithInsecure()) From 916207e2d9cfd50605f7c44852a54dec317c3b01 Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Tue, 7 Jan 2020 14:31:36 -0800 Subject: [PATCH 07/12] update the port --- test/integration/integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index aef62f65d..e1787eb60 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -199,8 +199,8 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu } func TestBasicConfigLegacy(t *testing.T) { - t.Run("testBasicConfigLegacy", testBasicConfigLegacy("8093", "0")) - t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("18093", "1000")) + t.Run("testBasicConfigLegacy", testBasicConfigLegacy("8094", "0")) + t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("18094", "1000")) } func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { From a45fde17123d4889d3327f838a652d4b8cbfff3d Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Tue, 7 Jan 2020 14:45:00 -0800 Subject: [PATCH 08/12] update the port --- test/integration/integration_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index e1787eb60..7167961f3 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -199,8 +199,8 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu } func TestBasicConfigLegacy(t *testing.T) { - t.Run("testBasicConfigLegacy", testBasicConfigLegacy("8094", "0")) - t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("18094", "1000")) + t.Run("testBasicConfigLegacy", testBasicConfigLegacy("8086", "0")) + t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("18086", "1000")) } func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { @@ -216,6 +216,8 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { os.Setenv("REDIS_TLS", "false") os.Setenv("REDIS_PERSECOND_TLS", "false") os.Setenv("LOCAL_CACHE_SIZE", local_cache_size) + os.Setenv("REDIS_PERSECOND_SOCKET_TYPE", "tcp") + os.Setenv("REDIS_SOCKET_TYPE", "tcp") local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 From 9ee5637ca43335313d3def522d04bc4401f3f9f2 Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Tue, 7 Jan 2020 15:05:37 -0800 Subject: [PATCH 09/12] undo the changes on the testBasicConfigLegacy --- test/integration/integration_test.go | 30 ++++++++-------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 7167961f3..bb51f73d1 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -199,14 +199,14 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu } func TestBasicConfigLegacy(t *testing.T) { - t.Run("testBasicConfigLegacy", testBasicConfigLegacy("8086", "0")) - t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("18086", "1000")) + t.Run("testBasicConfigLegacy", testBasicConfigLegacy("0")) + t.Run("testBasicConfigLegacyWithLocalCache", testBasicConfigLegacy("1000")) } -func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { +func testBasicConfigLegacy(local_cache_size string) func(*testing.T) { return func(t *testing.T) { os.Setenv("PORT", "8082") - os.Setenv("GRPC_PORT", grpcPort) + os.Setenv("GRPC_PORT", "8083") os.Setenv("DEBUG_PORT", "8084") os.Setenv("RUNTIME_ROOT", "runtime/current") os.Setenv("RUNTIME_SUBDIRECTORY", "ratelimit") @@ -216,23 +216,18 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { os.Setenv("REDIS_TLS", "false") os.Setenv("REDIS_PERSECOND_TLS", "false") os.Setenv("LOCAL_CACHE_SIZE", local_cache_size) - os.Setenv("REDIS_PERSECOND_SOCKET_TYPE", "tcp") - os.Setenv("REDIS_SOCKET_TYPE", "tcp") - local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 - store := stats.NewDefaultStore() - go func() { - runner.Run(store) + runner.Run(nil) }() // HACK: Wait for the server to come up. Make a hook that we can wait on. - time.Sleep(1 * time.Second) + time.Sleep(100 * time.Millisecond) assert := assert.New(t) - conn, err := grpc.Dial(fmt.Sprintf("localhost:%s", grpcPort), grpc.WithInsecure()) + conn, err := grpc.Dial("localhost:8083", grpc.WithInsecure()) assert.NoError(err) defer conn.Close() c := pb_legacy.NewRateLimitServiceClient(conn) @@ -257,8 +252,6 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { newDescriptorStatusLegacy(pb_legacy.RateLimitResponse_OK, 50, pb_legacy.RateLimit_SECOND, 49)}}, response) assert.NoError(err) - key1HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.basic_legacy.%s.total_hits", getCacheKey("key1", enable_local_cache))) - assert.Equal(1, int(key1HitCounter.Value())) // Now come up with a random key, and go over limit for a minute limit which should always work. r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -283,8 +276,6 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { newDescriptorStatusLegacy(status, 20, pb_legacy.RateLimit_MINUTE, limitRemaining)}}, response) assert.NoError(err) - key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) - assert.Equal(i+1, int(key2HitCounter.Value())) } // Limit now against 2 keys in the same domain. @@ -314,11 +305,6 @@ func testBasicConfigLegacy(grpcPort, local_cache_size string) func(*testing.T) { newDescriptorStatusLegacy(status, 10, pb_legacy.RateLimit_HOUR, limitRemaining2)}}, response) assert.NoError(err) - key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.total_hits", getCacheKey("key2", enable_local_cache))) - assert.Equal(i+1, int(key2HitCounter.Value())) - - key3HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another_legacy.%s.total_hits", getCacheKey("key3", enable_local_cache))) - assert.Equal(i+1, int(key3HitCounter.Value())) } } -} +} \ No newline at end of file From 2c5f42aaaa74d5818829ff1473a915edda3e456c Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Tue, 7 Jan 2020 15:07:25 -0800 Subject: [PATCH 10/12] add newline --- test/integration/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index bb51f73d1..4c7fe7b00 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -307,4 +307,4 @@ func testBasicConfigLegacy(local_cache_size string) func(*testing.T) { assert.NoError(err) } } -} \ No newline at end of file +} From f4a63999b7b33e20efe5e6166fc7cc05a63ce4bd Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Thu, 9 Jan 2020 20:01:11 -0800 Subject: [PATCH 11/12] change runner to be the class --- src/service_cmd/main.go | 3 ++- src/service_cmd/runner/runner.go | 19 ++++++++++++++----- test/integration/integration_test.go | 19 ++++++++++--------- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/service_cmd/main.go b/src/service_cmd/main.go index c9b53775a..5df1ab032 100644 --- a/src/service_cmd/main.go +++ b/src/service_cmd/main.go @@ -3,5 +3,6 @@ package main import "github.com/lyft/ratelimit/src/service_cmd/runner" func main() { - runner.Run(nil) + runner := runner.NewRunner() + runner.Run() } diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index a9587af95..fef847f9c 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -21,7 +21,19 @@ import ( logger "github.com/sirupsen/logrus" ) -func Run(store stats.Store) { +type Runner struct { + statsStore stats.Store +} + +func NewRunner() Runner { + return Runner{stats.NewDefaultStore()} +} + +func (runner *Runner) GetStatsStore() stats.Store{ + return runner.statsStore +} + +func (runner *Runner) Run() { s := settings.NewSettings() logLevel, err := logger.ParseLevel(s.LogLevel) @@ -30,11 +42,8 @@ func Run(store stats.Store) { } else { logger.SetLevel(logLevel) } - if store == nil { - store = stats.NewDefaultStore() - } - srv := server.NewServer("ratelimit", store, settings.GrpcUnaryInterceptor(nil)) + srv := server.NewServer("ratelimit", runner.statsStore, settings.GrpcUnaryInterceptor(nil)) var perSecondPool redis.Pool if s.RedisPerSecond { diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 4c7fe7b00..1965e3466 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -10,8 +10,6 @@ import ( "testing" "time" - stats "github.com/lyft/gostats" - pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2" pb_legacy "github.com/lyft/ratelimit/proto/ratelimit" "github.com/lyft/ratelimit/src/service_cmd/runner" @@ -43,6 +41,8 @@ func newDescriptorStatusLegacy( } } +// TODO: Once adding the ability of stopping the server in the runner (https://github.com/lyft/ratelimit/issues/119), +// stop the server at the end of each test, thus we can reuse the grpc port among these integration tests. func TestBasicConfig(t *testing.T) { t.Run("WithoutPerSecondRedis", testBasicConfig("8083", "false", "0")) t.Run("WithPerSecondRedis", testBasicConfig("8085", "true", "0")) @@ -95,10 +95,10 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 - store := stats.NewDefaultStore() + runner := runner.NewRunner() go func() { - runner.Run(store) + runner.Run() }() // HACK: Wait for the server to come up. Make a hook that we can wait on. @@ -132,7 +132,7 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu assert.NoError(err) // store.NewCounter returns the existing counter. - key1HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.basic.%s.total_hits", getCacheKey("key1", enable_local_cache))) + key1HitCounter := runner.GetStatsStore().NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.basic.%s.total_hits", getCacheKey("key1", enable_local_cache))) assert.Equal(1, int(key1HitCounter.Value())) // Now come up with a random key, and go over limit for a minute limit which should always work. @@ -158,7 +158,7 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu newDescriptorStatus(status, 20, pb.RateLimitResponse_RateLimit_MINUTE, limitRemaining)}}, response) assert.NoError(err) - key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) + key2HitCounter := runner.GetStatsStore().NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) assert.Equal(i+1, int(key2HitCounter.Value())) } @@ -189,10 +189,10 @@ func testBasicBaseConfig(grpcPort, perSecond string, local_cache_size string) fu newDescriptorStatus(status, 10, pb.RateLimitResponse_RateLimit_HOUR, limitRemaining2)}}, response) assert.NoError(err) - key2HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) + key2HitCounter := runner.GetStatsStore().NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key2", enable_local_cache))) assert.Equal(i+26, int(key2HitCounter.Value())) - key3HitCounter := store.NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key3", enable_local_cache))) + key3HitCounter := runner.GetStatsStore().NewCounter(fmt.Sprintf("ratelimit.service.rate_limit.another.%s.total_hits", getCacheKey("key3", enable_local_cache))) assert.Equal(i+1, int(key3HitCounter.Value())) } } @@ -219,8 +219,9 @@ func testBasicConfigLegacy(local_cache_size string) func(*testing.T) { local_cache_size_val, _ := strconv.Atoi(local_cache_size) enable_local_cache := local_cache_size_val > 0 + runner := runner.NewRunner() go func() { - runner.Run(nil) + runner.Run() }() // HACK: Wait for the server to come up. Make a hook that we can wait on. From 483400a88f2b0a3de05eaebcbd6ddb9994c231d0 Mon Sep 17 00:00:00 2001 From: Junchao Lyu Date: Thu, 9 Jan 2020 20:21:36 -0800 Subject: [PATCH 12/12] fix format --- src/service_cmd/runner/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index fef847f9c..2c0d8f83e 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -29,7 +29,7 @@ func NewRunner() Runner { return Runner{stats.NewDefaultStore()} } -func (runner *Runner) GetStatsStore() stats.Store{ +func (runner *Runner) GetStatsStore() stats.Store { return runner.statsStore }