From ebf56b69439710693c508747f67e81a9fdd04346 Mon Sep 17 00:00:00 2001 From: notdu Date: Sat, 25 Oct 2025 00:20:50 +0700 Subject: [PATCH] feat: add connection timeout configuration for Redis operations Signed-off-by: notdu --- README.md | 8 ++++++++ src/redis/cache_impl.go | 4 ++-- src/redis/driver_impl.go | 3 +++ src/settings/settings.go | 4 ++++ test/redis/bench_test.go | 2 +- test/redis/driver_impl_test.go | 6 +++--- 6 files changed, 21 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index b2171b02e..a425402f7 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ - [Local Cache](#local-cache) - [Redis](#redis) - [Redis type](#redis-type) + - [Connection Timeout](#connection-timeout) - [Pipelining](#pipelining) - [One Redis Instance](#one-redis-instance) - [Two Redis Instances](#two-redis-instances) @@ -1113,6 +1114,13 @@ The deployment type can be specified with the `REDIS_TYPE` / `REDIS_PERSECOND_TY 1. "sentinel": A comma separated list with the first string as the master name of the sentinel cluster followed by hostname:port pairs. The list size should be >= 2. The first item is the name of the master and the rest are the sentinels. 1. "cluster": A comma separated list of hostname:port pairs with all the nodes in the cluster. +## Connection Timeout + +Connection timeout controls the maximum duration for Redis connection establishment, read operations, and write operations. + +1. `REDIS_TIMEOUT`: sets the timeout for Redis connection and I/O operations. Default: `10s` +1. `REDIS_PERSECOND_TIMEOUT`: sets the timeout for per-second Redis connection and I/O operations. Default: `10s` + ## Pipelining By default, for each request, ratelimit will pick up a connection from pool, write multiple redis commands in a single write then reads their responses in a single read. This reduces network delay. diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index 30890786a..6a5128e5f 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -18,12 +18,12 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca var perSecondPool Client if s.RedisPerSecond { perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType, - s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv) + s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout) closer.Closers = append(closer.Closers, perSecondPool) } otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize, - s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv) + s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout) closer.Closers = append(closer.Closers, otherPool) return NewFixedRateLimitCacheImpl( diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 4b10615ac..98c9f6e1f 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -72,6 +72,7 @@ func checkError(err error) { func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, + timeout time.Duration, ) Client { maskedUrl := utils.MaskCredentialsInUrl(url) logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize) @@ -79,6 +80,8 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT df := func(network, addr string) (radix.Conn, error) { var dialOpts []radix.DialOpt + dialOpts = append(dialOpts, radix.DialTimeout(timeout)) + if useTls { dialOpts = append(dialOpts, radix.DialUseTLS(tlsConfig)) } diff --git a/src/settings/settings.go b/src/settings/settings.go index e3affb270..0eb53f814 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -159,6 +159,10 @@ type Settings struct { RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"` // Enable healthcheck to check Redis Connection. If there is no active connection, healthcheck failed. RedisHealthCheckActiveConnection bool `envconfig:"REDIS_HEALTH_CHECK_ACTIVE_CONNECTION" default:"false"` + // RedisTimeout sets the timeout for Redis connection and I/O operations. + RedisTimeout time.Duration `envconfig:"REDIS_TIMEOUT" default:"10s"` + // RedisPerSecondTimeout sets the timeout for per-second Redis connection and I/O operations. + RedisPerSecondTimeout time.Duration `envconfig:"REDIS_PERSECOND_TIMEOUT" default:"10s"` // Memcache settings MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""` // MemcacheMaxIdleConns sets the maximum number of idle TCP connections per memcached node. diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go index b0b2a2153..0f654e2d6 100644 --- a/test/redis/bench_test.go +++ b/test/redis/bench_test.go @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) { return func(b *testing.B) { statsStore := gostats.NewStore(gostats.NewNullSink(), false) sm := stats.NewMockStatManager(statsStore) - client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil) + client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second) defer client.Close() cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true) diff --git a/test/redis/driver_impl_test.go b/test/redis/driver_impl_test.go index cbb5a9560..3ca96b671 100644 --- a/test/redis/driver_impl_test.go +++ b/test/redis/driver_impl_test.go @@ -38,7 +38,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(auth, addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil) + return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second) } t.Run("connection refused", func(t *testing.T) { @@ -131,7 +131,7 @@ func TestDoCmd(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second) } t.Run("SETGET ok", func(t *testing.T) { @@ -176,7 +176,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second) } t.Run("SETGET ok", func(t *testing.T) {