From 3752c99490207322e85f5a6712c51a101c223a9f Mon Sep 17 00:00:00 2001 From: Nelz Date: Mon, 9 Oct 2017 15:22:56 -0700 Subject: [PATCH 1/2] Remove dependency on juju --- ratelimit/token_bucket.go | 40 ---------------------------------- ratelimit/token_bucket_test.go | 17 --------------- 2 files changed, 57 deletions(-) diff --git a/ratelimit/token_bucket.go b/ratelimit/token_bucket.go index b71e50bb1..e8a6de6f6 100644 --- a/ratelimit/token_bucket.go +++ b/ratelimit/token_bucket.go @@ -3,9 +3,6 @@ package ratelimit import ( "context" "errors" - "time" - - "github.com/juju/ratelimit" "github.com/go-kit/kit/endpoint" ) @@ -14,22 +11,6 @@ import ( // triggered and the request is rejected. var ErrLimited = errors.New("rate limit exceeded") -// NewTokenBucketLimiter returns an endpoint.Middleware that acts as a rate -// limiter based on a token-bucket algorithm. Requests that would exceed the -// maximum request rate are simply rejected with an error. -func NewTokenBucketLimiter(tb *ratelimit.Bucket) endpoint.Middleware { - return NewErroringLimiter(NewAllower(tb)) -} - -// NewTokenBucketThrottler returns an endpoint.Middleware that acts as a -// request throttler based on a token-bucket algorithm. Requests that would -// exceed the maximum request rate are delayed. -// The parameterized function "_" is kept for backwards-compatiblity of -// the API, but it is no longer used for anything. You may pass it nil. -func NewTokenBucketThrottler(tb *ratelimit.Bucket, _ func(time.Duration)) endpoint.Middleware { - return NewDelayingLimiter(NewWaiter(tb)) -} - // Allower dictates whether or not a request is acceptable to run. // The Limiter from "golang.org/x/time/rate" already implements this interface, // one is able to use that in NewErroringLimiter without any modifications. @@ -81,13 +62,6 @@ func (f AllowerFunc) Allow() bool { return f() } -// NewAllower turns an existing ratelimit.Bucket into an API-compatible form -func NewAllower(tb *ratelimit.Bucket) Allower { - return AllowerFunc(func() bool { - return (tb.TakeAvailable(1) != 0) - }) -} - // WaiterFunc is an adapter that lets a function operate as if // it implements Waiter type WaiterFunc func(ctx context.Context) error @@ -96,17 +70,3 @@ type WaiterFunc func(ctx context.Context) error func (f WaiterFunc) Wait(ctx context.Context) error { return f(ctx) } - -// NewWaiter turns an existing ratelimit.Bucket into an API-compatible form -func NewWaiter(tb *ratelimit.Bucket) Waiter { - return WaiterFunc(func(ctx context.Context) error { - dur := tb.Take(1) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(dur): - // happy path - } - return nil - }) -} diff --git a/ratelimit/token_bucket_test.go b/ratelimit/token_bucket_test.go index d444fe992..3845c9ee3 100644 --- a/ratelimit/token_bucket_test.go +++ b/ratelimit/token_bucket_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - jujuratelimit "github.com/juju/ratelimit" "golang.org/x/time/rate" "github.com/go-kit/kit/endpoint" @@ -15,22 +14,6 @@ import ( var nopEndpoint = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil } -func TestTokenBucketLimiter(t *testing.T) { - tb := jujuratelimit.NewBucket(time.Minute, 1) - testSuccessThenFailure( - t, - ratelimit.NewTokenBucketLimiter(tb)(nopEndpoint), - ratelimit.ErrLimited.Error()) -} - -func TestTokenBucketThrottler(t *testing.T) { - tb := jujuratelimit.NewBucket(time.Minute, 1) - testSuccessThenFailure( - t, - ratelimit.NewTokenBucketThrottler(tb, nil)(nopEndpoint), - "context deadline exceeded") -} - func TestXRateErroring(t *testing.T) { limit := rate.NewLimiter(rate.Every(time.Minute), 1) testSuccessThenFailure( From 43c41836466be9209b63a32d69a4e41cffad8f58 Mon Sep 17 00:00:00 2001 From: Nelz Date: Mon, 9 Oct 2017 17:00:43 -0700 Subject: [PATCH 2/2] Downstream usages of ratelimit package --- examples/addsvc/pkg/addendpoint/set.go | 8 +++++--- examples/addsvc/pkg/addtransport/grpc.go | 4 ++-- examples/addsvc/pkg/addtransport/http.go | 5 +++-- examples/addsvc/pkg/addtransport/thrift.go | 5 +++-- examples/stringsvc3/proxying.go | 5 +++-- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/examples/addsvc/pkg/addendpoint/set.go b/examples/addsvc/pkg/addendpoint/set.go index 3a65b083b..e4acaff47 100644 --- a/examples/addsvc/pkg/addendpoint/set.go +++ b/examples/addsvc/pkg/addendpoint/set.go @@ -2,8 +2,10 @@ package addendpoint import ( "context" + "time" + + "golang.org/x/time/rate" - rl "github.com/juju/ratelimit" stdopentracing "github.com/opentracing/opentracing-go" "github.com/sony/gobreaker" @@ -31,7 +33,7 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, var sumEndpoint endpoint.Endpoint { sumEndpoint = MakeSumEndpoint(svc) - sumEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(sumEndpoint) + sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint) sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint) sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint) @@ -40,7 +42,7 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, var concatEndpoint endpoint.Endpoint { concatEndpoint = MakeConcatEndpoint(svc) - concatEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(100, 100))(concatEndpoint) + concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint) concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint) concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint) diff --git a/examples/addsvc/pkg/addtransport/grpc.go b/examples/addsvc/pkg/addtransport/grpc.go index ec05baa82..6ec58d7f4 100644 --- a/examples/addsvc/pkg/addtransport/grpc.go +++ b/examples/addsvc/pkg/addtransport/grpc.go @@ -7,10 +7,10 @@ import ( "google.golang.org/grpc" - jujuratelimit "github.com/juju/ratelimit" stdopentracing "github.com/opentracing/opentracing-go" "github.com/sony/gobreaker" oldcontext "golang.org/x/net/context" + "golang.org/x/time/rate" "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" @@ -76,7 +76,7 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l // construct per-endpoint circuitbreaker middlewares to demonstrate how // that's done, although they could easily be combined into a single breaker // for the entire remote instance, too. - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) // Each individual endpoint is an http/transport.Client (which implements // endpoint.Endpoint) that gets wrapped with various middlewares. If you diff --git a/examples/addsvc/pkg/addtransport/http.go b/examples/addsvc/pkg/addtransport/http.go index ecdee9288..3819c6d87 100644 --- a/examples/addsvc/pkg/addtransport/http.go +++ b/examples/addsvc/pkg/addtransport/http.go @@ -11,7 +11,8 @@ import ( "strings" "time" - jujuratelimit "github.com/juju/ratelimit" + "golang.org/x/time/rate" + stdopentracing "github.com/opentracing/opentracing-go" "github.com/sony/gobreaker" @@ -68,7 +69,7 @@ func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Log // construct per-endpoint circuitbreaker middlewares to demonstrate how // that's done, although they could easily be combined into a single breaker // for the entire remote instance, too. - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) // Each individual endpoint is an http/transport.Client (which implements // endpoint.Endpoint) that gets wrapped with various middlewares. If you diff --git a/examples/addsvc/pkg/addtransport/thrift.go b/examples/addsvc/pkg/addtransport/thrift.go index c6797ecbd..485840fe0 100644 --- a/examples/addsvc/pkg/addtransport/thrift.go +++ b/examples/addsvc/pkg/addtransport/thrift.go @@ -4,7 +4,8 @@ import ( "context" "time" - jujuratelimit "github.com/juju/ratelimit" + "golang.org/x/time/rate" + "github.com/sony/gobreaker" "github.com/go-kit/kit/circuitbreaker" @@ -58,7 +59,7 @@ func NewThriftClient(client *addthrift.AddServiceClient) addservice.Service { // construct per-endpoint circuitbreaker middlewares to demonstrate how // that's done, although they could easily be combined into a single breaker // for the entire remote instance, too. - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) // Each individual endpoint is an http/transport.Client (which implements // endpoint.Endpoint) that gets wrapped with various middlewares. If you diff --git a/examples/stringsvc3/proxying.go b/examples/stringsvc3/proxying.go index 8b1013f31..0f6780776 100644 --- a/examples/stringsvc3/proxying.go +++ b/examples/stringsvc3/proxying.go @@ -8,7 +8,8 @@ import ( "strings" "time" - jujuratelimit "github.com/juju/ratelimit" + "golang.org/x/time/rate" + "github.com/sony/gobreaker" "github.com/go-kit/kit/circuitbreaker" @@ -47,7 +48,7 @@ func proxyingMiddleware(ctx context.Context, instances string, logger log.Logger var e endpoint.Endpoint e = makeUppercaseProxy(ctx, instance) e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) - e = ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e) + e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e) endpointer = append(endpointer, e) }