Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/addsvc/pkg/addendpoint/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package addendpoint

import (
"context"
"time"

"golang.org/x/time/rate"

rl "github.com/juju/ratelimit"
stdopentracing "github.com/opentracing/opentracing-go"
"github.com/sony/gobreaker"

Expand Down Expand Up @@ -31,7 +33,7 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram,
var sumEndpoint endpoint.Endpoint
{
sumEndpoint = MakeSumEndpoint(svc)
sumEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(sumEndpoint)
sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint)
sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint)
sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint)
Expand All @@ -40,7 +42,7 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram,
var concatEndpoint endpoint.Endpoint
{
concatEndpoint = MakeConcatEndpoint(svc)
concatEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(100, 100))(concatEndpoint)
concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint)
concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint)
concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint)
Expand Down
4 changes: 2 additions & 2 deletions examples/addsvc/pkg/addtransport/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

"google.golang.org/grpc"

jujuratelimit "github.com/juju/ratelimit"
stdopentracing "github.com/opentracing/opentracing-go"
"github.com/sony/gobreaker"
oldcontext "golang.org/x/net/context"
"golang.org/x/time/rate"

"github.com/go-kit/kit/circuitbreaker"
"github.com/go-kit/kit/endpoint"
Expand Down Expand Up @@ -76,7 +76,7 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))

// Each individual endpoint is an http/transport.Client (which implements
// endpoint.Endpoint) that gets wrapped with various middlewares. If you
Expand Down
5 changes: 3 additions & 2 deletions examples/addsvc/pkg/addtransport/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"strings"
"time"

jujuratelimit "github.com/juju/ratelimit"
"golang.org/x/time/rate"

stdopentracing "github.com/opentracing/opentracing-go"
"github.com/sony/gobreaker"

Expand Down Expand Up @@ -68,7 +69,7 @@ func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Log
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))

// Each individual endpoint is an http/transport.Client (which implements
// endpoint.Endpoint) that gets wrapped with various middlewares. If you
Expand Down
5 changes: 3 additions & 2 deletions examples/addsvc/pkg/addtransport/thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"context"
"time"

jujuratelimit "github.com/juju/ratelimit"
"golang.org/x/time/rate"

"github.com/sony/gobreaker"

"github.com/go-kit/kit/circuitbreaker"
Expand Down Expand Up @@ -58,7 +59,7 @@ func NewThriftClient(client *addthrift.AddServiceClient) addservice.Service {
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))

// Each individual endpoint is an http/transport.Client (which implements
// endpoint.Endpoint) that gets wrapped with various middlewares. If you
Expand Down
5 changes: 3 additions & 2 deletions examples/stringsvc3/proxying.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"strings"
"time"

jujuratelimit "github.com/juju/ratelimit"
"golang.org/x/time/rate"

"github.com/sony/gobreaker"

"github.com/go-kit/kit/circuitbreaker"
Expand Down Expand Up @@ -47,7 +48,7 @@ func proxyingMiddleware(ctx context.Context, instances string, logger log.Logger
var e endpoint.Endpoint
e = makeUppercaseProxy(ctx, instance)
e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e)
endpointer = append(endpointer, e)
}

Expand Down
40 changes: 0 additions & 40 deletions ratelimit/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package ratelimit
import (
"context"
"errors"
"time"

"github.com/juju/ratelimit"

"github.com/go-kit/kit/endpoint"
)
Expand All @@ -14,22 +11,6 @@ import (
// triggered and the request is rejected.
var ErrLimited = errors.New("rate limit exceeded")

// NewTokenBucketLimiter returns an endpoint.Middleware that acts as a rate
// limiter based on a token-bucket algorithm. Requests that would exceed the
// maximum request rate are simply rejected with an error.
func NewTokenBucketLimiter(tb *ratelimit.Bucket) endpoint.Middleware {
return NewErroringLimiter(NewAllower(tb))
}

// NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
// request throttler based on a token-bucket algorithm. Requests that would
// exceed the maximum request rate are delayed.
// The parameterized function "_" is kept for backwards-compatiblity of
// the API, but it is no longer used for anything. You may pass it nil.
func NewTokenBucketThrottler(tb *ratelimit.Bucket, _ func(time.Duration)) endpoint.Middleware {
return NewDelayingLimiter(NewWaiter(tb))
}

// Allower dictates whether or not a request is acceptable to run.
// The Limiter from "golang.org/x/time/rate" already implements this interface,
// one is able to use that in NewErroringLimiter without any modifications.
Expand Down Expand Up @@ -81,13 +62,6 @@ func (f AllowerFunc) Allow() bool {
return f()
}

// NewAllower turns an existing ratelimit.Bucket into an API-compatible form
func NewAllower(tb *ratelimit.Bucket) Allower {
return AllowerFunc(func() bool {
return (tb.TakeAvailable(1) != 0)
})
}

// WaiterFunc is an adapter that lets a function operate as if
// it implements Waiter
type WaiterFunc func(ctx context.Context) error
Expand All @@ -96,17 +70,3 @@ type WaiterFunc func(ctx context.Context) error
func (f WaiterFunc) Wait(ctx context.Context) error {
return f(ctx)
}

// NewWaiter turns an existing ratelimit.Bucket into an API-compatible form
func NewWaiter(tb *ratelimit.Bucket) Waiter {
return WaiterFunc(func(ctx context.Context) error {
dur := tb.Take(1)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(dur):
// happy path
}
return nil
})
}
17 changes: 0 additions & 17 deletions ratelimit/token_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

jujuratelimit "github.com/juju/ratelimit"
"golang.org/x/time/rate"

"github.com/go-kit/kit/endpoint"
Expand All @@ -15,22 +14,6 @@ import (

var nopEndpoint = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }

func TestTokenBucketLimiter(t *testing.T) {
tb := jujuratelimit.NewBucket(time.Minute, 1)
testSuccessThenFailure(
t,
ratelimit.NewTokenBucketLimiter(tb)(nopEndpoint),
ratelimit.ErrLimited.Error())
}

func TestTokenBucketThrottler(t *testing.T) {
tb := jujuratelimit.NewBucket(time.Minute, 1)
testSuccessThenFailure(
t,
ratelimit.NewTokenBucketThrottler(tb, nil)(nopEndpoint),
"context deadline exceeded")
}

func TestXRateErroring(t *testing.T) {
limit := rate.NewLimiter(rate.Every(time.Minute), 1)
testSuccessThenFailure(
Expand Down