Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@
- [Local Cache](#local-cache)
- [Redis](#redis)
- [Redis type](#redis-type)
- [Connection Timeout](#connection-timeout)
- [Pipelining](#pipelining)
- [Connection Pool Settings](#connection-pool-settings)
- [Pool Size](#pool-size)
- [Connection Timeout](#connection-timeout)
- [Pool On-Empty Behavior](#pool-on-empty-behavior)
- [Pipelining](#pipelining)
- [One Redis Instance](#one-redis-instance)
- [Two Redis Instances](#two-redis-instances)
- [Health Checking for Redis Active Connection](#health-checking-for-redis-active-connection)
Expand Down Expand Up @@ -1274,14 +1277,33 @@ The deployment type can be specified with the `REDIS_TYPE` / `REDIS_PERSECOND_TY
1. "sentinel": A comma separated list with the first string as the master name of the sentinel cluster followed by hostname:port pairs. The list size should be >= 2. The first item is the name of the master and the rest are the sentinels.
1. "cluster": A comma separated list of hostname:port pairs with all the nodes in the cluster.

## Connection Timeout
## Connection Pool Settings

Connection timeout controls the maximum duration for Redis connection establishment, read operations, and write operations.
### Pool Size

1. `REDIS_POOL_SIZE`: the number of connections to keep in the pool. Default: `10`
1. `REDIS_PERSECOND_POOL_SIZE`: pool size for per-second Redis. Default: `10`

### Connection Timeout

Controls the maximum duration for Redis connection establishment, read operations, and write operations.

1. `REDIS_TIMEOUT`: sets the timeout for Redis connection and I/O operations. Default: `10s`
1. `REDIS_PERSECOND_TIMEOUT`: sets the timeout for per-second Redis connection and I/O operations. Default: `10s`

## Pipelining
### Pool On-Empty Behavior

Controls what happens when all connections in the pool are in use and a new request arrives.

1. `REDIS_POOL_ON_EMPTY_BEHAVIOR`: controls what happens when the pool is empty. Default: `CREATE`
- `CREATE`: create a new overflow connection after waiting for `REDIS_POOL_ON_EMPTY_WAIT_DURATION`. This is the [default radix behavior](https://github.com/mediocregopher/radix/blob/v3.8.1/pool.go#L291-L312).
- `ERROR`: return an error after waiting for `REDIS_POOL_ON_EMPTY_WAIT_DURATION`. This enforces a strict pool size limit.
- `WAIT`: block until a connection becomes available. This enforces a strict pool size limit but may cause goroutine buildup.
1. `REDIS_POOL_ON_EMPTY_WAIT_DURATION`: the duration to wait before taking the configured action (`CREATE` or `ERROR`). Default: `1s`
1. `REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR`: same as above for per-second Redis pool. Default: `CREATE`
1. `REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION`: same as above for per-second Redis pool. Default: `1s`

### Pipelining

By default, for each request, ratelimit will pick up a connection from pool, write multiple redis commands in a single write then reads their responses in a single read. This reduces network delay.

Expand Down
6 changes: 4 additions & 2 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca
var perSecondPool Client
if s.RedisPerSecond {
perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType,
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout)
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout,
s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondPoolOnEmptyWaitDuration)
closer.Closers = append(closer.Closers, perSecondPool)
}

otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout)
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout,
s.RedisPoolOnEmptyBehavior, s.RedisPoolOnEmptyWaitDuration)
closer.Closers = append(closer.Closers, otherPool)

return NewFixedRateLimitCacheImpl(
Expand Down
17 changes: 16 additions & 1 deletion src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func checkError(err error) {

func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int,
pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server,
timeout time.Duration,
timeout time.Duration, poolOnEmptyBehavior string, poolOnEmptyWaitDuration time.Duration,
) Client {
maskedUrl := utils.MaskCredentialsInUrl(url)
logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize)
Expand Down Expand Up @@ -112,6 +112,21 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT
}
logger.Debugf("Implicit pipelining enabled: %v", implicitPipelining)

switch strings.ToUpper(poolOnEmptyBehavior) {
case "WAIT":
opts = append(opts, radix.PoolOnEmptyWait())
logger.Warnf("Redis pool %s: on-empty=WAIT (block until connection available)", maskedUrl)
Comment thread
notdu marked this conversation as resolved.
case "CREATE":
opts = append(opts, radix.PoolOnEmptyCreateAfter(poolOnEmptyWaitDuration))
logger.Warnf("Redis pool %s: on-empty=CREATE after %v", maskedUrl, poolOnEmptyWaitDuration)
case "ERROR":
opts = append(opts, radix.PoolOnEmptyErrAfter(poolOnEmptyWaitDuration))
logger.Warnf("Redis pool %s: on-empty=ERROR after %v (fail-fast)", maskedUrl, poolOnEmptyWaitDuration)
default:
logger.Warnf("Redis pool %s: invalid on-empty behavior '%s', using default CREATE after %v", maskedUrl, poolOnEmptyBehavior, poolOnEmptyWaitDuration)
opts = append(opts, radix.PoolOnEmptyCreateAfter(poolOnEmptyWaitDuration))
}

poolFunc := func(network, addr string) (radix.Client, error) {
return radix.NewPool(network, addr, poolSize, opts...)
}
Expand Down
19 changes: 19 additions & 0 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,25 @@ type Settings struct {
RedisTimeout time.Duration `envconfig:"REDIS_TIMEOUT" default:"10s"`
// RedisPerSecondTimeout sets the timeout for per-second Redis connection and I/O operations.
RedisPerSecondTimeout time.Duration `envconfig:"REDIS_PERSECOND_TIMEOUT" default:"10s"`

// RedisPoolOnEmptyBehavior controls what happens when Redis connection pool is empty.
// This setting helps prevent connection storms during Redis failures.
// Possible values:
// - "CREATE": Create a new connection after RedisPoolOnEmptyWaitDuration (default)
// - "ERROR": Return error after RedisPoolOnEmptyWaitDuration
// - "WAIT": Block until a connection is available
RedisPoolOnEmptyBehavior string `envconfig:"REDIS_POOL_ON_EMPTY_BEHAVIOR" default:"CREATE"`
// RedisPoolOnEmptyWaitDuration is the wait duration before taking action when pool is empty.
// Only applicable when RedisPoolOnEmptyBehavior is "CREATE" or "ERROR".
RedisPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"`

// RedisPerSecondPoolOnEmptyBehavior controls pool-empty behavior for per-second Redis.
// See RedisPoolOnEmptyBehavior for possible values and details.
RedisPerSecondPoolOnEmptyBehavior string `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR" default:"CREATE"`
// RedisPerSecondPoolOnEmptyWaitDuration is the wait duration for per-second Redis pool.
// See RedisPoolOnEmptyWaitDuration for details.
RedisPerSecondPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"`

// Memcache settings
MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""`
// MemcacheMaxIdleConns sets the maximum number of idle TCP connections per memcached node.
Expand Down
114 changes: 114 additions & 0 deletions src/settings/settings_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package settings

import (
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -11,3 +13,115 @@ func TestSettingsTlsConfigUnmodified(t *testing.T) {
assert.NotNil(t, settings.RedisTlsConfig)
assert.Nil(t, settings.RedisTlsConfig.RootCAs)
}

// Tests for RedisPoolOnEmptyBehavior
func TestRedisPoolOnEmptyBehavior_Default(t *testing.T) {
os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")

settings := NewSettings()

assert.Equal(t, "CREATE", settings.RedisPoolOnEmptyBehavior)
assert.Equal(t, 1*time.Second, settings.RedisPoolOnEmptyWaitDuration)
}

func TestRedisPoolOnEmptyBehavior_Error(t *testing.T) {
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR")
os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "0")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")

settings := NewSettings()

assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior)
assert.Equal(t, time.Duration(0), settings.RedisPoolOnEmptyWaitDuration)
}

func TestRedisPoolOnEmptyBehavior_ErrorWithDuration(t *testing.T) {
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR")
os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "100ms")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")

settings := NewSettings()

assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior)
assert.Equal(t, 100*time.Millisecond, settings.RedisPoolOnEmptyWaitDuration)
}

func TestRedisPoolOnEmptyBehavior_Create(t *testing.T) {
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "CREATE")
os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "500ms")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")

settings := NewSettings()

assert.Equal(t, "CREATE", settings.RedisPoolOnEmptyBehavior)
assert.Equal(t, 500*time.Millisecond, settings.RedisPoolOnEmptyWaitDuration)
}

func TestRedisPoolOnEmptyBehavior_Wait(t *testing.T) {
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "WAIT")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")

settings := NewSettings()

assert.Equal(t, "WAIT", settings.RedisPoolOnEmptyBehavior)
}

func TestRedisPoolOnEmptyBehavior_CaseInsensitive(t *testing.T) {
// Test that lowercase values work (processing is done in driver_impl.go)
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "error")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")

settings := NewSettings()

// Setting stores as-is, case conversion happens in driver_impl.go
assert.Equal(t, "error", settings.RedisPoolOnEmptyBehavior)
}

// Tests for RedisPerSecondPoolOnEmptyBehavior
func TestRedisPerSecondPoolOnEmptyBehavior_Default(t *testing.T) {
os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR")
os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION")

settings := NewSettings()

assert.Equal(t, "CREATE", settings.RedisPerSecondPoolOnEmptyBehavior)
assert.Equal(t, 1*time.Second, settings.RedisPerSecondPoolOnEmptyWaitDuration)
}

func TestRedisPerSecondPoolOnEmptyBehavior_Error(t *testing.T) {
os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR", "ERROR")
os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION", "50ms")
defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR")
defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION")

settings := NewSettings()

assert.Equal(t, "ERROR", settings.RedisPerSecondPoolOnEmptyBehavior)
assert.Equal(t, 50*time.Millisecond, settings.RedisPerSecondPoolOnEmptyWaitDuration)
}

// Test both pools can be configured independently
func TestRedisPoolOnEmptyBehavior_IndependentConfiguration(t *testing.T) {
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR")
os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "0")
os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR", "CREATE")
os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION", "100ms")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")
defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR")
defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION")

settings := NewSettings()

// Main pool configured for fail-fast
assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior)
assert.Equal(t, time.Duration(0), settings.RedisPoolOnEmptyWaitDuration)

// Per-second pool configured differently
assert.Equal(t, "CREATE", settings.RedisPerSecondPoolOnEmptyBehavior)
assert.Equal(t, 100*time.Millisecond, settings.RedisPerSecondPoolOnEmptyWaitDuration)
}
6 changes: 5 additions & 1 deletion src/srv/srv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import (
)

func mockAddrsLookup(service, proto, name string) (cname string, addrs []*net.SRV, err error) {
return "ignored", []*net.SRV{{"z", 1, 0, 0}, {"z", 0, 0, 0}, {"a", 9001, 0, 0}}, nil
return "ignored", []*net.SRV{
{Target: "z", Port: 1, Priority: 0, Weight: 0},
{Target: "z", Port: 0, Priority: 0, Weight: 0},
{Target: "a", Port: 9001, Priority: 0, Weight: 0},
}, nil
}

func TestLookupServerStringsFromSrvReturnsServersSorted(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/redis/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) {
return func(b *testing.B) {
statsStore := gostats.NewStore(gostats.NewNullSink(), false)
sm := stats.NewMockStatManager(statsStore)
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0)
defer client.Close()

cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true)
Expand Down
Loading
Loading