From c6456f319bc695eb98c8ba64e697a9ac76437b99 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Thu, 25 Dec 2025 23:53:48 +0900 Subject: [PATCH 01/14] feat: upgrade radix from v3 to v4 Upgrade radix Redis client from v3.8.1 to v4.1.4. Main changes: - Import paths: radix/v3 -> radix/v4 - Pool/Cluster/Sentinel use Config.New() instead of New() - All client operations require context.Context parameter - Dialer setup changed from functional options to struct config - Pipelining uses radix.NewPipeline() and Append() - Write buffering via Dialer.WriteFlushInterval Breaking from v3: - Pool on-empty behavior (WAIT/CREATE/ERROR) not available - REDIS_PIPELINE_LIMIT setting deprecated (no effect in v4) Tested with existing test suite - all tests passing. Signed-off-by: seonghyun --- go.mod | 6 +- go.sum | 8 +- src/redis/cache_impl.go | 4 +- src/redis/driver.go | 10 +- src/redis/driver_impl.go | 242 ++++++++++++++++++++++++++------------ test/mocks/redis/redis.go | 12 +- 6 files changed, 186 insertions(+), 96 deletions(-) diff --git a/go.mod b/go.mod index 2c9cf881e..ef8fe331b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/alicebob/miniredis/v2 v2.33.0 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 + github.com/cespare/xxhash/v2 v2.3.0 github.com/coocood/freecache v1.2.4 github.com/envoyproxy/go-control-plane v0.13.4 github.com/envoyproxy/go-control-plane/envoy v1.32.4 @@ -21,7 +22,7 @@ require ( github.com/libp2p/go-reuseport v0.4.0 github.com/lyft/goruntime v0.3.0 github.com/lyft/gostats v0.4.14 - github.com/mediocregopher/radix/v3 v3.8.1 + github.com/mediocregopher/radix/v4 v4.1.4 github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/statsd_exporter v0.26.1 @@ -46,7 +47,6 @@ require ( github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect @@ -60,13 +60,13 @@ require ( github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/tilinna/clock v1.0.2 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel/metric v1.36.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/sys v0.34.0 // indirect golang.org/x/text v0.27.0 // indirect - golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250728155136-f173205681a0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 16d1824cc..88c160e27 100644 --- a/go.sum +++ b/go.sum @@ -102,8 +102,8 @@ github.com/lyft/goruntime v0.3.0/go.mod h1:BW1gngSpMJR9P9w23BPUPdhdbUWhpirl98TQh github.com/lyft/gostats v0.4.1/go.mod h1:Tpx2xRzz4t+T2Tx0xdVgIoBdR2UMVz+dKnE3X01XSd8= github.com/lyft/gostats v0.4.14 h1:xmP4yMfDvEKtlNZEcS2sYz0cvnps1ri337ZEEbw3ab8= github.com/lyft/gostats v0.4.14/go.mod h1:cJWqEVL8JIewIJz/olUIios2F1q06Nc51hXejPQmBH0= -github.com/mediocregopher/radix/v3 v3.8.1 h1:rOkHflVuulFKlwsLY01/M2cM2tWCjDoETcMqKbAWu1M= -github.com/mediocregopher/radix/v3 v3.8.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= +github.com/mediocregopher/radix/v4 v4.1.4 h1:Uze6DEbEAvL+VHXUEu/EDBTkUk5CLct5h3nVSGpc6Ts= +github.com/mediocregopher/radix/v4 v4.1.4/go.mod h1:ajchozX/6ELmydxWeWM6xCFHVpZ4+67LXHOTOVR0nCE= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -144,6 +144,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI= +github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -241,8 +243,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= -golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index c411af2ec..7812b3eb7 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -19,13 +19,13 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca if s.RedisPerSecond { perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType, s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout, - s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondPoolOnEmptyWaitDuration, s.RedisPerSecondSentinelAuth) + s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondPoolOnEmptyWaitDuration, s.RedisPerSecondSentinelAuth, s.RedisPerSecondUseExplicitPipeline) closer.Closers = append(closer.Closers, perSecondPool) } otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize, s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout, - s.RedisPoolOnEmptyBehavior, s.RedisPoolOnEmptyWaitDuration, s.RedisSentinelAuth) + s.RedisPoolOnEmptyBehavior, s.RedisPoolOnEmptyWaitDuration, s.RedisSentinelAuth, s.RedisUseExplicitPipeline) closer.Closers = append(closer.Closers, otherPool) return NewFixedRateLimitCacheImpl( diff --git a/src/redis/driver.go b/src/redis/driver.go index 7ffc0c7b7..5063956bf 100644 --- a/src/redis/driver.go +++ b/src/redis/driver.go @@ -1,6 +1,6 @@ package redis -import "github.com/mediocregopher/radix/v3" +import "github.com/mediocregopher/radix/v4" // Errors that may be raised during config parsing. type RedisError string @@ -42,8 +42,10 @@ type Client interface { // NumActiveConns return number of active connections, used in testing. NumActiveConns() int - // ImplicitPipeliningEnabled return true if implicit pipelining is enabled. - ImplicitPipeliningEnabled() bool + // UseExplicitPipeline returns true if explicit pipelining should be used. + // When false, individual commands are executed and radix v4 automatically buffers writes. + // When true, commands are batched using radix.NewPipeline(). + UseExplicitPipeline() bool } -type Pipeline []radix.CmdAction +type Pipeline []radix.Action diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 8201aa895..c737790ef 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -1,14 +1,16 @@ package redis import ( + "context" "crypto/tls" "fmt" + "net" "strings" "time" stats "github.com/lyft/gostats" - "github.com/mediocregopher/radix/v3" - "github.com/mediocregopher/radix/v3/trace" + "github.com/mediocregopher/radix/v4" + "github.com/mediocregopher/radix/v4/trace" logger "github.com/sirupsen/logrus" "github.com/envoyproxy/ratelimit/src/server" @@ -58,10 +60,17 @@ func poolTrace(ps *poolStats, healthCheckActiveConnection bool, srv server.Serve } } +// redisClient is an interface that abstracts radix Client, Cluster, and Sentinel +// All of these types have Do(context.Context, Action) and Close() methods +type redisClient interface { + Do(context.Context, radix.Action) error + Close() error +} + type clientImpl struct { - client radix.Client - stats poolStats - implicitPipelining bool + client redisClient + stats poolStats + useExplicitPipeline bool } func checkError(err error) { @@ -73,104 +82,163 @@ func checkError(err error) { func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, timeout time.Duration, poolOnEmptyBehavior string, poolOnEmptyWaitDuration time.Duration, sentinelAuth string, + useExplicitPipeline bool, ) Client { maskedUrl := utils.MaskCredentialsInUrl(url) logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize) - df := func(network, addr string) (radix.Conn, error) { - var dialOpts []radix.DialOpt + // Create Dialer for connecting to Redis + var netDialer net.Dialer + if timeout > 0 { + netDialer.Timeout = timeout + } - dialOpts = append(dialOpts, radix.DialTimeout(timeout)) + dialer := radix.Dialer{ + NetDialer: &netDialer, + } - if useTls { - dialOpts = append(dialOpts, radix.DialUseTLS(tlsConfig)) + // Setup TLS if needed + if useTls { + tlsNetDialer := tls.Dialer{ + NetDialer: &netDialer, + Config: tlsConfig, } + dialer.NetDialer = &tlsNetDialer + } - if auth != "" { - user, pass, found := strings.Cut(auth, ":") - if found { - logger.Warnf("enabling authentication to redis on %s with user %s", maskedUrl, user) - dialOpts = append(dialOpts, radix.DialAuthUser(user, pass)) - } else { - logger.Warnf("enabling authentication to redis on %s without user", maskedUrl) - dialOpts = append(dialOpts, radix.DialAuthPass(auth)) - } + if auth != "" { + user, pass, found := strings.Cut(auth, ":") + if found { + logger.Warnf("enabling authentication to redis on %s with user %s", maskedUrl, user) + dialer.AuthUser = user + dialer.AuthPass = pass + } else { + logger.Warnf("enabling authentication to redis on %s without user", maskedUrl) + dialer.AuthPass = auth } - - return radix.Dial(network, addr, dialOpts...) } stats := newPoolStats(scope) - opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats, healthCheckActiveConnection, srv))} + // Create PoolConfig + poolConfig := radix.PoolConfig{ + Dialer: dialer, + Size: poolSize, + Trace: poolTrace(&stats, healthCheckActiveConnection, srv), + } + + // Note: radix v4 handles write buffering via WriteFlushInterval. + // Explicit pipelining (radix.NewPipeline()) is only used when explicitly requested via useExplicitPipeline parameter. + // Otherwise, individual Do() calls are used with automatic write buffering via WriteFlushInterval. + // pipelineLimit parameter is deprecated and ignored in radix v4. - implicitPipelining := true - if pipelineWindow == 0 && pipelineLimit == 0 { - implicitPipelining = false - } else { - opts = append(opts, radix.PoolPipelineWindow(pipelineWindow, pipelineLimit)) + // Set WriteFlushInterval for automatic write buffering when not using explicit pipeline + if !useExplicitPipeline && pipelineWindow > 0 { + poolConfig.Dialer.WriteFlushInterval = pipelineWindow + logger.Debugf("Setting WriteFlushInterval to %v (pipelineLimit=%d is ignored in v4)", pipelineWindow, pipelineLimit) } - logger.Debugf("Implicit pipelining enabled: %v", implicitPipelining) + logger.Debugf("Use explicit pipeline: %v", useExplicitPipeline) + + // IMPORTANT: radix v4 pool behavior changes from v3 + // + // v4 uses a FIXED pool size and BLOCKS when all connections are in use. + // This is similar to v3's WAIT behavior, NOT CREATE or ERROR behavior. + // + // Key differences: + // - v3 WAIT → v4 default (same: blocks until connection available) + // - v3 CREATE → v4 INCOMPATIBLE (v4 does NOT create overflow connections, only blocks) + // - v3 ERROR → v4 INCOMPATIBLE (v4 does NOT fail-fast, only blocks) + // + // Migration recommendations: + // - Remove REDIS_POOL_ON_EMPTY_BEHAVIOR setting (it has no effect in v4) + // - Use context timeouts to prevent indefinite blocking: + // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + // defer cancel() + // client.Do(ctx, cmd) + // - Consider increasing REDIS_POOL_SIZE if using CREATE or ERROR previously switch strings.ToUpper(poolOnEmptyBehavior) { case "WAIT": - opts = append(opts, radix.PoolOnEmptyWait()) - logger.Warnf("Redis pool %s: on-empty=WAIT (block until connection available)", maskedUrl) + logger.Warnf("Redis pool %s: WAIT is default in v4 (blocks until connection available)", maskedUrl) case "CREATE": - opts = append(opts, radix.PoolOnEmptyCreateAfter(poolOnEmptyWaitDuration)) - logger.Warnf("Redis pool %s: on-empty=CREATE after %v", maskedUrl, poolOnEmptyWaitDuration) + // v3 CREATE created overflow connections when pool was full + // v4 does NOT support this - it will block instead + logger.Errorf("Redis pool %s: CREATE not supported in v4. Pool size=%d is fixed, requests will block. Use context timeouts!", maskedUrl, poolSize) case "ERROR": - opts = append(opts, radix.PoolOnEmptyErrAfter(poolOnEmptyWaitDuration)) - logger.Warnf("Redis pool %s: on-empty=ERROR after %v (fail-fast)", maskedUrl, poolOnEmptyWaitDuration) + // v3 ERROR failed fast when pool was full + // v4 does NOT support this - it will block instead, which could cause goroutine buildup + logger.Errorf("Redis pool %s: ERROR not supported in v4. Requests will block instead of failing. Use context timeouts!", maskedUrl) default: - logger.Warnf("Redis pool %s: invalid on-empty behavior '%s', using default CREATE after %v", maskedUrl, poolOnEmptyBehavior, poolOnEmptyWaitDuration) - opts = append(opts, radix.PoolOnEmptyCreateAfter(poolOnEmptyWaitDuration)) + logger.Warnf("Redis pool %s: using v4 default (fixed size=%d, blocks when full)", maskedUrl, poolSize) } - poolFunc := func(network, addr string) (radix.Client, error) { - return radix.NewPool(network, addr, poolSize, opts...) + poolFunc := func(ctx context.Context, network, addr string) (radix.Client, error) { + return poolConfig.New(ctx, network, addr) } - var client radix.Client + var client redisClient var err error + ctx := context.Background() + switch strings.ToLower(redisType) { case "single": - client, err = poolFunc(redisSocketType, url) + logger.Warnf("Creating single with urls %v", url) + client, err = poolFunc(ctx, redisSocketType, url) case "cluster": urls := strings.Split(url, ",") - if !implicitPipelining { - panic(RedisError("Implicit Pipelining must be enabled to work with Redis Cluster Mode. Set values for REDIS_PIPELINE_WINDOW or REDIS_PIPELINE_LIMIT to enable implicit pipelining")) + if useExplicitPipeline { + panic(RedisError("Explicit pipelining cannot be used with Redis Cluster Mode. Set REDIS_PIPELINE_WINDOW to a non-zero value (e.g., 150us)")) } logger.Warnf("Creating cluster with urls %v", urls) - client, err = radix.NewCluster(urls, radix.ClusterPoolFunc(poolFunc)) + clusterConfig := radix.ClusterConfig{ + PoolConfig: poolConfig, + } + client, err = clusterConfig.New(ctx, urls) case "sentinel": urls := strings.Split(url, ",") if len(urls) < 2 { panic(RedisError("Expected master name and a list of urls for the sentinels, in the format: ,,...,")) } - sentinelDialFunc := func(network, addr string) (radix.Conn, error) { - var dialOpts []radix.DialOpt - // Always set the dial timeout consistent with the main dial func - dialOpts = append(dialOpts, radix.DialTimeout(timeout)) - if useTls { - logger.Warnf("enabling TLS to redis sentinel on %s", addr) - dialOpts = append(dialOpts, radix.DialUseTLS(tlsConfig)) + + // Create sentinel dialer + var sentinelNetDialer net.Dialer + if timeout > 0 { + sentinelNetDialer.Timeout = timeout + } + + sentinelDialer := radix.Dialer{ + NetDialer: &sentinelNetDialer, + } + + // Setup TLS for sentinel if needed + if useTls { + logger.Warnf("enabling TLS to redis sentinel") + tlsSentinelDialer := tls.Dialer{ + NetDialer: &sentinelNetDialer, + Config: tlsConfig, } - // Use sentinelAuth for authenticating to Sentinel nodes, not auth - // auth is used for Redis master/replica authentication - if sentinelAuth != "" { - user, pass, found := strings.Cut(sentinelAuth, ":") - if found { - logger.Warnf("enabling authentication to redis sentinel on %s with user %s", addr, user) - dialOpts = append(dialOpts, radix.DialAuthUser(user, pass)) - } else { - logger.Warnf("enabling authentication to redis sentinel on %s without user", addr) - dialOpts = append(dialOpts, radix.DialAuthPass(sentinelAuth)) - } + sentinelDialer.NetDialer = &tlsSentinelDialer + } + + // Use sentinelAuth for authenticating to Sentinel nodes, not auth + // auth is used for Redis master/replica authentication + if sentinelAuth != "" { + user, pass, found := strings.Cut(sentinelAuth, ":") + if found { + logger.Warnf("enabling authentication to redis sentinel with user %s", user) + sentinelDialer.AuthUser = user + sentinelDialer.AuthPass = pass + } else { + logger.Warnf("enabling authentication to redis sentinel without user") + sentinelDialer.AuthPass = sentinelAuth } - return radix.Dial(network, addr, dialOpts...) } - client, err = radix.NewSentinel(urls[0], urls[1:], radix.SentinelConnFunc(sentinelDialFunc), radix.SentinelPoolFunc(poolFunc)) + + sentinelConfig := radix.SentinelConfig{ + PoolConfig: poolConfig, + SentinelDialer: sentinelDialer, + } + client, err = sentinelConfig.New(ctx, urls[0], urls[1:]) default: panic(RedisError("Unrecognized redis type " + redisType)) } @@ -179,20 +247,25 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT // Check if connection is good var pingResponse string - checkError(client.Do(radix.Cmd(&pingResponse, "PING"))) + checkError(client.Do(ctx, radix.Cmd(&pingResponse, "PING"))) if pingResponse != "PONG" { checkError(fmt.Errorf("connecting redis error: %s", pingResponse)) } return &clientImpl{ - client: client, - stats: stats, - implicitPipelining: implicitPipelining, + client: client, + stats: stats, + useExplicitPipeline: useExplicitPipeline, } } func (c *clientImpl) DoCmd(rcv interface{}, cmd, key string, args ...interface{}) error { - return c.client.Do(radix.FlatCmd(rcv, cmd, key, args...)) + ctx := context.Background() + // Combine key and args into a single slice + allArgs := make([]interface{}, 0, 1+len(args)) + allArgs = append(allArgs, key) + allArgs = append(allArgs, args...) + return c.client.Do(ctx, radix.FlatCmd(rcv, cmd, allArgs...)) } func (c *clientImpl) Close() error { @@ -204,22 +277,37 @@ func (c *clientImpl) NumActiveConns() int { } func (c *clientImpl) PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key string, args ...interface{}) Pipeline { - return append(pipeline, radix.FlatCmd(rcv, cmd, key, args...)) + // Combine key and args into a single slice + allArgs := make([]interface{}, 0, 1+len(args)) + allArgs = append(allArgs, key) + allArgs = append(allArgs, args...) + return append(pipeline, radix.FlatCmd(rcv, cmd, allArgs...)) } func (c *clientImpl) PipeDo(pipeline Pipeline) error { - if c.implicitPipelining { + ctx := context.Background() + if c.useExplicitPipeline { + // When explicit pipelining is enabled (WriteFlushInterval == 0): + // Use radix.NewPipeline() to batch commands together. + p := radix.NewPipeline() for _, action := range pipeline { - if err := c.client.Do(action); err != nil { - return err - } + p.Append(action) } - return nil + return c.client.Do(ctx, p) } - return c.client.Do(radix.Pipeline(pipeline...)) + // When automatic buffering is enabled (WriteFlushInterval > 0): + // Execute each action individually. Radix v4 will automatically buffer + // concurrent writes and flush them together based on WriteFlushInterval. + // This provides better performance for most workloads. + for _, action := range pipeline { + if err := c.client.Do(ctx, action); err != nil { + return err + } + } + return nil } -func (c *clientImpl) ImplicitPipeliningEnabled() bool { - return c.implicitPipelining +func (c *clientImpl) UseExplicitPipeline() bool { + return c.useExplicitPipeline } diff --git a/test/mocks/redis/redis.go b/test/mocks/redis/redis.go index 2d6d059f8..02f0f258a 100644 --- a/test/mocks/redis/redis.go +++ b/test/mocks/redis/redis.go @@ -68,18 +68,18 @@ func (mr *MockClientMockRecorder) DoCmd(arg0, arg1, arg2 interface{}, arg3 ...in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoCmd", reflect.TypeOf((*MockClient)(nil).DoCmd), varargs...) } -// ImplicitPipeliningEnabled mocks base method -func (m *MockClient) ImplicitPipeliningEnabled() bool { +// UseExplicitPipeline mocks base method +func (m *MockClient) UseExplicitPipeline() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ImplicitPipeliningEnabled") + ret := m.ctrl.Call(m, "UseExplicitPipeline") ret0, _ := ret[0].(bool) return ret0 } -// ImplicitPipeliningEnabled indicates an expected call of ImplicitPipeliningEnabled -func (mr *MockClientMockRecorder) ImplicitPipeliningEnabled() *gomock.Call { +// UseExplicitPipeline indicates an expected call of UseExplicitPipeline +func (mr *MockClientMockRecorder) UseExplicitPipeline() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImplicitPipeliningEnabled", reflect.TypeOf((*MockClient)(nil).ImplicitPipeliningEnabled)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseExplicitPipeline", reflect.TypeOf((*MockClient)(nil).UseExplicitPipeline)) } // NumActiveConns mocks base method From 579ddbcc6039f439b7a9ac8ec1a1268f94b1c951 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Thu, 25 Dec 2025 23:53:56 +0900 Subject: [PATCH 02/14] docs: update pipeline settings for radix v4 Update documentation to reflect radix v4's pipeline behavior: - REDIS_PIPELINE_WINDOW now sets WriteFlushInterval (auto-flush timing) - REDIS_PIPELINE_LIMIT deprecated - no effect in v4 - Add REDIS_USE_EXPLICIT_PIPELINE for manual pipeline control - Required for Redis Cluster: PIPELINE_WINDOW must be non-zero Update terminology from "implicit pipelining" to "write buffering" to better match radix v4's actual behavior. Signed-off-by: seonghyun --- README.md | 11 +++++------ src/settings/settings.go | 29 ++++++++++++++++++++++------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 09d392e46..b9c099a21 100644 --- a/README.md +++ b/README.md @@ -1308,14 +1308,13 @@ Controls what happens when all connections in the pool are in use and a new requ By default, for each request, ratelimit will pick up a connection from pool, write multiple redis commands in a single write then reads their responses in a single read. This reduces network delay. -For high throughput scenarios, ratelimit also support [implicit pipelining](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L238) . It can be configured using the following environment variables: +For high throughput scenarios, ratelimit supports write buffering via [radix v4's WriteFlushInterval](https://pkg.go.dev/github.com/mediocregopher/radix/v4#Dialer). It can be configured using the following environment variables: -1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: sets the duration after which internal pipelines will be flushed. - If window is zero then implicit pipelining will be disabled. -1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: sets maximum number of commands that can be pipelined before flushing. - If limit is zero then no limit will be used and pipelines will only be limited by the specified time window. +1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: controls how often buffered writes are flushed to the network connection. When set to a non-zero value (e.g., 150us-500us), radix v4 will buffer multiple concurrent write operations and flush them together, reducing system calls and improving throughput. If zero, each write is flushed immediately. **Required for Redis Cluster mode.** +1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: **DEPRECATED** - These settings have no effect in radix v4. Write buffering is controlled solely by the window settings above. +1. `REDIS_USE_EXPLICIT_PIPELINE` & `REDIS_PERSECOND_USE_EXPLICIT_PIPELINE`: controls whether to use explicit pipelining with `radix.NewPipeline()`. When `true`, commands are batched together and sent as a single pipeline request. When `false` (default), individual commands are sent with automatic write buffering via `WriteFlushInterval`. **IMPORTANT**: Explicit pipelining **CANNOT** be used with Redis Cluster mode. For cluster mode, this must be set to `false` and use `REDIS_PIPELINE_WINDOW` for write buffering instead. Only use this for single/sentinel mode when you specifically need explicit pipeline control. Default: `false` -`implicit pipelining` is disabled by default. To enable it, you can use default values [used by radix](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L278) and tune for the optimal value. +Write buffering is disabled by default (window = 0). For optimal performance, set `REDIS_PIPELINE_WINDOW` to 150us-500us depending on your latency requirements and load patterns. ## One Redis Instance diff --git a/src/settings/settings.go b/src/settings/settings.go index 9beb786c0..8365fb8a8 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -137,13 +137,25 @@ type Settings struct { RedisTlsCACert string `envconfig:"REDIS_TLS_CACERT" default:""` RedisTlsSkipHostnameVerification bool `envconfig:"REDIS_TLS_SKIP_HOSTNAME_VERIFICATION" default:"false"` - // RedisPipelineWindow sets the duration after which internal pipelines will be flushed. - // If window is zero then implicit pipelining will be disabled. Radix use 150us for the - // default value, see https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L278. + // RedisPipelineWindow sets the WriteFlushInterval for radix v4 connections. + // This controls how often buffered writes are flushed to the network connection. + // When set to a non-zero value, radix v4 will buffer multiple concurrent write operations + // and flush them together, reducing system calls and improving throughput. + // If zero, each write is flushed immediately (no buffering). + // Required for Redis Cluster mode. Recommended value: 150us-500us. + // See: https://pkg.go.dev/github.com/mediocregopher/radix/v4#Dialer RedisPipelineWindow time.Duration `envconfig:"REDIS_PIPELINE_WINDOW" default:"0"` - // RedisPipelineLimit sets maximum number of commands that can be pipelined before flushing. - // If limit is zero then no limit will be used and pipelines will only be limited by the specified time window. + // RedisPipelineLimit is DEPRECATED and unused in radix v4. + // This setting has no effect. Radix v4 does not support explicit pipeline size limits. + // Write buffering is controlled solely by RedisPipelineWindow (WriteFlushInterval). RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` + // RedisUseExplicitPipeline controls whether to use explicit pipelining (radix.NewPipeline()). + // When true, commands are batched using radix.NewPipeline() and sent together. + // When false (default), individual commands are sent with automatic write buffering via WriteFlushInterval. + // IMPORTANT: Explicit pipelining CANNOT be used with Redis Cluster mode. + // For cluster mode, you MUST use automatic write buffering (set this to false and use RedisPipelineWindow). + // Only set this to true for single/sentinel mode when you specifically need explicit pipeline control. + RedisUseExplicitPipeline bool `envconfig:"REDIS_USE_EXPLICIT_PIPELINE" default:"false"` RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` RedisPerSecondType string `envconfig:"REDIS_PERSECOND_TYPE" default:"SINGLE"` @@ -159,12 +171,15 @@ type Settings struct { // This is separate from RedisPerSecondAuth which is used for authenticating to the Redis master/replica nodes. // If empty, no authentication will be attempted when connecting to per-second Sentinel nodes. RedisPerSecondSentinelAuth string `envconfig:"REDIS_PERSECOND_SENTINEL_AUTH" default:""` - // RedisPerSecondPipelineWindow sets the duration after which internal pipelines will be flushed for per second redis. + // RedisPerSecondPipelineWindow sets the WriteFlushInterval for per-second redis connections. // See comments of RedisPipelineWindow for details. RedisPerSecondPipelineWindow time.Duration `envconfig:"REDIS_PERSECOND_PIPELINE_WINDOW" default:"0"` - // RedisPerSecondPipelineLimit sets maximum number of commands that can be pipelined before flushing for per second redis. + // RedisPerSecondPipelineLimit is DEPRECATED and unused in radix v4. // See comments of RedisPipelineLimit for details. RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"` + // RedisPerSecondUseExplicitPipeline controls explicit pipelining for per-second redis. + // See comments of RedisUseExplicitPipeline for details. + RedisPerSecondUseExplicitPipeline bool `envconfig:"REDIS_PERSECOND_USE_EXPLICIT_PIPELINE" default:"false"` // Enable healthcheck to check Redis Connection. If there is no active connection, healthcheck failed. RedisHealthCheckActiveConnection bool `envconfig:"REDIS_HEALTH_CHECK_ACTIVE_CONNECTION" default:"false"` // RedisTimeout sets the timeout for Redis connection and I/O operations. From dc887a68059d4dee632f0404ddc492080f3c81ce Mon Sep 17 00:00:00 2001 From: seonghyun Date: Thu, 25 Dec 2025 23:54:04 +0900 Subject: [PATCH 03/14] test: update tests for radix v4 - Add useExplicitPipeline parameter to test client creation - Update error assertions for v4's error message format (v4 prefixes with "response returned from Conn:") - Handle different connection errors (EOF, connection reset, broken pipe) - Update radix.FlatCmd usage for v4 API Signed-off-by: seonghyun --- test/redis/bench_test.go | 2 +- test/redis/driver_impl_test.go | 44 +++++++++++++++-------------- test/redis/fixed_cache_impl_test.go | 4 +-- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go index 1f9e3336b..b55474c94 100644 --- a/test/redis/bench_test.go +++ b/test/redis/bench_test.go @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) { return func(b *testing.B) { statsStore := gostats.NewStore(gostats.NewNullSink(), false) sm := stats.NewMockStatManager(statsStore) - client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "") + client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "", false) defer client.Close() cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true) diff --git a/test/redis/driver_impl_test.go b/test/redis/driver_impl_test.go index 311d5f03a..db6532ba7 100644 --- a/test/redis/driver_impl_test.go +++ b/test/redis/driver_impl_test.go @@ -33,13 +33,13 @@ func expectPanicError(t *testing.T, f assert.PanicTestFunc) (result error) { return } -func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) func(t *testing.T) { +func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit int, useExplicitPipeline bool) func(t *testing.T) { return func(t *testing.T) { redisAuth := "123" statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(auth, addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "") + return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "", useExplicitPipeline) } t.Run("connection refused", func(t *testing.T) { @@ -66,7 +66,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit redisSrv.RequireAuth(redisAuth) - assert.PanicsWithError(t, "NOAUTH Authentication required.", func() { + assert.PanicsWithError(t, "response returned from Conn: NOAUTH Authentication required.", func() { mkRedisClient("", redisSrv.Addr()) }) }) @@ -103,36 +103,32 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit redisSrv.RequireUserAuth(user, pass) redisAuth := fmt.Sprintf("%s:invalid-password", user) - assert.PanicsWithError(t, "WRONGPASS invalid username-password pair", func() { + assert.PanicsWithError(t, "response returned from Conn: WRONGPASS invalid username-password pair", func() { mkRedisClient(redisAuth, redisSrv.Addr()) }) }) - t.Run("ImplicitPipeliningEnabled() return expected value", func(t *testing.T) { + t.Run("UseExplicitPipeline() return expected value", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() client := mkRedisClient("", redisSrv.Addr()) - if pipelineWindow == 0 && pipelineLimit == 0 { - assert.False(t, client.ImplicitPipeliningEnabled()) - } else { - assert.True(t, client.ImplicitPipeliningEnabled()) - } + assert.Equal(t, useExplicitPipeline, client.UseExplicitPipeline()) }) } } func TestNewClientImpl(t *testing.T) { - t.Run("ImplicitPipeliningEnabled", testNewClientImpl(t, 2*time.Millisecond, 2)) - t.Run("ImplicitPipeliningDisabled", testNewClientImpl(t, 0, 0)) + t.Run("AutoBuffering", testNewClientImpl(t, 2*time.Millisecond, 2, false)) + t.Run("ExplicitPipeline", testNewClientImpl(t, 0, 0, true)) } func TestDoCmd(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, "", 0, "") + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, "", 0, "", false) } t.Run("SETGET ok", func(t *testing.T) { @@ -168,16 +164,16 @@ func TestDoCmd(t *testing.T) { assert.Nil(t, client.DoCmd(nil, "SET", "foo", "bar")) redisSrv.Close() - assert.EqualError(t, client.DoCmd(nil, "GET", "foo"), "EOF") + assert.EqualError(t, client.DoCmd(nil, "GET", "foo"), "response returned from Conn: EOF") }) } -func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) func(t *testing.T) { +func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int, useExplicitPipeline bool) func(t *testing.T) { return func(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "") + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "", useExplicitPipeline) } t.Run("SETGET ok", func(t *testing.T) { @@ -220,7 +216,13 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f expectErrContainEOF := func(t *testing.T, err error) { assert.NotNil(t, err) - assert.Contains(t, err.Error(), "EOF") + // radix v4 wraps errors with "response returned from Conn:" + // and may return different connection errors (EOF, connection reset, etc) + errMsg := err.Error() + hasConnectionError := strings.Contains(errMsg, "EOF") || + strings.Contains(errMsg, "connection reset") || + strings.Contains(errMsg, "broken pipe") + assert.True(t, hasConnectionError, "expected connection error, got: %s", errMsg) } expectErrContainEOF(t, client.PipeDo(client.PipeAppend(redis.Pipeline{}, nil, "GET", "foo"))) @@ -229,8 +231,8 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f } func TestPipeDo(t *testing.T) { - t.Run("ImplicitPipeliningEnabled", testPipeDo(t, 10*time.Millisecond, 2)) - t.Run("ImplicitPipeliningDisabled", testPipeDo(t, 0, 0)) + t.Run("AutoBuffering", testPipeDo(t, 10*time.Millisecond, 2, false)) + t.Run("ExplicitPipeline", testPipeDo(t, 0, 0, true)) } // Tests for pool on-empty behavior @@ -239,7 +241,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // Helper to create client with specific on-empty behavior mkRedisClientWithBehavior := func(addr, behavior string, waitDuration time.Duration) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, behavior, waitDuration, "") + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, behavior, waitDuration, "", false) } t.Run("default behavior (empty string)", func(t *testing.T) { @@ -387,7 +389,7 @@ func TestNewClientImplSentinel(t *testing.T) { mkSentinelClient := func(auth, sentinelAuth, url string, useTls bool, timeout time.Duration) redis.Client { // Pass nil for tlsConfig - we can't test TLS without a real TLS server, // but we can verify the code path is executed (logs will show TLS is enabled) - return redis.NewClientImpl(statsStore, useTls, auth, "tcp", "sentinel", url, 1, 0, 0, nil, false, nil, timeout, "", 0, sentinelAuth) + return redis.NewClientImpl(statsStore, useTls, auth, "tcp", "sentinel", url, 1, 0, 0, nil, false, nil, timeout, "", 0, sentinelAuth, false) } t.Run("invalid url format - missing sentinel addresses", func(t *testing.T) { diff --git a/test/redis/fixed_cache_impl_test.go b/test/redis/fixed_cache_impl_test.go index 7838b67ab..b1a8088cd 100644 --- a/test/redis/fixed_cache_impl_test.go +++ b/test/redis/fixed_cache_impl_test.go @@ -8,7 +8,7 @@ import ( "github.com/envoyproxy/ratelimit/test/mocks/stats" "github.com/coocood/freecache" - "github.com/mediocregopher/radix/v3" + "github.com/mediocregopher/radix/v4" pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" gostats "github.com/lyft/gostats" @@ -35,7 +35,7 @@ func TestRedis(t *testing.T) { } func pipeAppend(pipeline redis.Pipeline, rcv interface{}, cmd, key string, args ...interface{}) redis.Pipeline { - return append(pipeline, radix.FlatCmd(rcv, cmd, key, args...)) + return append(pipeline, radix.FlatCmd(rcv, cmd, append([]interface{}{key}, args...)...)) } func testRedis(usePerSecondRedis bool) func(*testing.T) { From 9848be117d15aca44e64681f5d0df0800981237c Mon Sep 17 00:00:00 2001 From: seonghyun Date: Fri, 26 Dec 2025 00:10:59 +0900 Subject: [PATCH 04/14] chore: add deprecation warning for REDIS_PIPELINE_LIMIT Signed-off-by: seonghyun --- src/redis/driver_impl.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index c737790ef..ccb6fb51e 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -132,10 +132,15 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT // Otherwise, individual Do() calls are used with automatic write buffering via WriteFlushInterval. // pipelineLimit parameter is deprecated and ignored in radix v4. + // Warn if deprecated pipelineLimit is set + if pipelineLimit > 0 { + logger.Warnf("REDIS_PIPELINE_LIMIT=%d is deprecated and has no effect in radix v4. Write buffering is controlled solely by REDIS_PIPELINE_WINDOW.", pipelineLimit) + } + // Set WriteFlushInterval for automatic write buffering when not using explicit pipeline if !useExplicitPipeline && pipelineWindow > 0 { poolConfig.Dialer.WriteFlushInterval = pipelineWindow - logger.Debugf("Setting WriteFlushInterval to %v (pipelineLimit=%d is ignored in v4)", pipelineWindow, pipelineLimit) + logger.Debugf("Setting WriteFlushInterval to %v", pipelineWindow) } logger.Debugf("Use explicit pipeline: %v", useExplicitPipeline) From 1b13ba353c35b31aa24412cc5e8c4805a1665150 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Fri, 26 Dec 2025 00:47:44 +0900 Subject: [PATCH 05/14] test: fix Redis cluster test config for radix v4 Replace deprecated RedisPipelineLimit with RedisPipelineWindow in configRedisCluster function. Radix v4 requires WriteFlushInterval (RedisPipelineWindow) for cluster mode buffering instead of the deprecated pipeline limit setting. Signed-off-by: seonghyun --- test/integration/integration_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index abc3d726d..5c0234d42 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -471,8 +471,9 @@ func configRedisCluster(s *settings.Settings) { s.RedisAuth = "password123" s.RedisPerSecondAuth = "password123" - s.RedisPerSecondPipelineLimit = 8 - s.RedisPipelineLimit = 8 + // RedisPipelineLimit is deprecated in radix v4, use RedisPipelineWindow instead + s.RedisPerSecondPipelineWindow = 150 * time.Microsecond + s.RedisPipelineWindow = 150 * time.Microsecond } func testBasicConfigWithoutWatchRootWithRedisCluster(perSecond bool, local_cache_size int) func(*testing.T) { From 4e52e43f8ad56f2a8d95cc36239d59d7ee296bd1 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Fri, 26 Dec 2025 00:50:44 +0900 Subject: [PATCH 06/14] style: fix gofmt struct field alignment in settings.go Signed-off-by: seonghyun --- src/settings/settings.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/settings/settings.go b/src/settings/settings.go index 8365fb8a8..b7948dc04 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -148,14 +148,14 @@ type Settings struct { // RedisPipelineLimit is DEPRECATED and unused in radix v4. // This setting has no effect. Radix v4 does not support explicit pipeline size limits. // Write buffering is controlled solely by RedisPipelineWindow (WriteFlushInterval). - RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` + RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` // RedisUseExplicitPipeline controls whether to use explicit pipelining (radix.NewPipeline()). // When true, commands are batched using radix.NewPipeline() and sent together. // When false (default), individual commands are sent with automatic write buffering via WriteFlushInterval. // IMPORTANT: Explicit pipelining CANNOT be used with Redis Cluster mode. // For cluster mode, you MUST use automatic write buffering (set this to false and use RedisPipelineWindow). // Only set this to true for single/sentinel mode when you specifically need explicit pipeline control. - RedisUseExplicitPipeline bool `envconfig:"REDIS_USE_EXPLICIT_PIPELINE" default:"false"` + RedisUseExplicitPipeline bool `envconfig:"REDIS_USE_EXPLICIT_PIPELINE" default:"false"` RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` RedisPerSecondType string `envconfig:"REDIS_PERSECOND_TYPE" default:"SINGLE"` From 550ecc9d1462fbc46ff0c28f82d6c61990ab8c39 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Sat, 27 Dec 2025 03:53:13 +0900 Subject: [PATCH 07/14] fix: fail fast on unsupported REDIS_POOL_ON_EMPTY_BEHAVIOR settings Radix v4 does not support CREATE or ERROR behaviors for REDIS_POOL_ON_EMPTY_BEHAVIOR. Previously, these settings were logged as errors but the application would continue with blocking behavior, which could cause unexpected issues in production. Changes: - Panic at startup when CREATE or ERROR is detected - Prevent silent behavior changes that could cause blocking - Update tests to verify panic behavior - Improve migration documentation in comments This ensures users are immediately notified of incompatible configuration rather than experiencing unexpected blocking in production. Signed-off-by: seonghyun --- src/redis/driver_impl.go | 27 +++++----- test/redis/driver_impl_test.go | 92 +++++++++++++++++----------------- 2 files changed, 59 insertions(+), 60 deletions(-) diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index ccb6fb51e..4b8f51c15 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -148,31 +148,32 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT // IMPORTANT: radix v4 pool behavior changes from v3 // // v4 uses a FIXED pool size and BLOCKS when all connections are in use. - // This is similar to v3's WAIT behavior, NOT CREATE or ERROR behavior. + // This is the same as v3's WAIT behavior. // - // Key differences: - // - v3 WAIT → v4 default (same: blocks until connection available) - // - v3 CREATE → v4 INCOMPATIBLE (v4 does NOT create overflow connections, only blocks) - // - v3 ERROR → v4 INCOMPATIBLE (v4 does NOT fail-fast, only blocks) + // v3 CREATE and ERROR behaviors are NOT supported in v4: + // - v3 WAIT → v4 supported (blocks until connection available) + // - v3 CREATE → v4 NOT SUPPORTED (would block instead of creating overflow connections) + // - v3 ERROR → v4 NOT SUPPORTED (would block instead of failing fast) // - // Migration recommendations: - // - Remove REDIS_POOL_ON_EMPTY_BEHAVIOR setting (it has no effect in v4) + // Migration requirements: + // - Remove REDIS_POOL_ON_EMPTY_BEHAVIOR setting if set to CREATE or ERROR + // - Use WAIT or leave unset (WAIT is default) + // - Consider increasing REDIS_POOL_SIZE if you previously relied on CREATE // - Use context timeouts to prevent indefinite blocking: // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) // defer cancel() // client.Do(ctx, cmd) - // - Consider increasing REDIS_POOL_SIZE if using CREATE or ERROR previously switch strings.ToUpper(poolOnEmptyBehavior) { case "WAIT": - logger.Warnf("Redis pool %s: WAIT is default in v4 (blocks until connection available)", maskedUrl) + logger.Warnf("Redis pool %s: WAIT is default in radix v4 (blocks until connection available)", maskedUrl) case "CREATE": // v3 CREATE created overflow connections when pool was full - // v4 does NOT support this - it will block instead - logger.Errorf("Redis pool %s: CREATE not supported in v4. Pool size=%d is fixed, requests will block. Use context timeouts!", maskedUrl, poolSize) + // v4 does NOT support this - fail fast to prevent unexpected blocking behavior + panic(RedisError("REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4. Pool will block instead of creating overflow connections. Remove this setting or set to WAIT, and consider increasing REDIS_POOL_SIZE.")) case "ERROR": // v3 ERROR failed fast when pool was full - // v4 does NOT support this - it will block instead, which could cause goroutine buildup - logger.Errorf("Redis pool %s: ERROR not supported in v4. Requests will block instead of failing. Use context timeouts!", maskedUrl) + // v4 does NOT support this - fail fast to prevent unexpected blocking behavior + panic(RedisError("REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4. Pool will block instead of failing fast. Remove this setting or set to WAIT, and use context timeouts for fail-fast behavior.")) default: logger.Warnf("Redis pool %s: using v4 default (fixed size=%d, blocks when full)", maskedUrl, poolSize) } diff --git a/test/redis/driver_impl_test.go b/test/redis/driver_impl_test.go index db6532ba7..c40c8b64d 100644 --- a/test/redis/driver_impl_test.go +++ b/test/redis/driver_impl_test.go @@ -261,72 +261,48 @@ func TestPoolOnEmptyBehavior(t *testing.T) { assert.Equal(t, "bar", res) }) - t.Run("ERROR behavior", func(t *testing.T) { + t.Run("ERROR behavior should panic", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR", 0) + // radix v4 does not support ERROR behavior - should panic at startup + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR", 0) }) - assert.NotNil(t, client) - - // Verify client works - var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test", "value")) - assert.Nil(t, client.DoCmd(&res, "GET", "test")) - assert.Equal(t, "value", res) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4") }) - t.Run("ERROR behavior with wait duration", func(t *testing.T) { + t.Run("ERROR behavior with wait duration should panic", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR", 100*time.Millisecond) + // radix v4 does not support ERROR behavior - should panic at startup + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR", 100*time.Millisecond) }) - assert.NotNil(t, client) - - // Verify client works - var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test2", "value2")) - assert.Nil(t, client.DoCmd(&res, "GET", "test2")) - assert.Equal(t, "value2", res) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4") }) - t.Run("CREATE behavior", func(t *testing.T) { + t.Run("CREATE behavior should panic", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE", 0) + // radix v4 does not support CREATE behavior - should panic at startup + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE", 0) }) - assert.NotNil(t, client) - - // Verify client works - var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test3", "value3")) - assert.Nil(t, client.DoCmd(&res, "GET", "test3")) - assert.Equal(t, "value3", res) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4") }) - t.Run("CREATE behavior with wait duration", func(t *testing.T) { + t.Run("CREATE behavior with wait duration should panic", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE", 500*time.Millisecond) + // radix v4 does not support CREATE behavior - should panic at startup + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE", 500*time.Millisecond) }) - assert.NotNil(t, client) - - // Verify client works - var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test4", "value4")) - assert.Nil(t, client.DoCmd(&res, "GET", "test4")) - assert.Equal(t, "value4", res) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4") }) t.Run("WAIT behavior", func(t *testing.T) { @@ -346,14 +322,36 @@ func TestPoolOnEmptyBehavior(t *testing.T) { assert.Equal(t, "value5", res) }) - t.Run("case insensitive behavior", func(t *testing.T) { + t.Run("case insensitive behavior - lowercase 'error' panics", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + // Test that lowercase 'error' is treated same as 'ERROR' (case insensitive) + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "error", 0) + }) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4") + }) + + t.Run("case insensitive behavior - lowercase 'create' panics", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + // Test that lowercase 'create' is treated same as 'CREATE' (case insensitive) + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "create", 0) + }) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4") + }) + + t.Run("case insensitive behavior - lowercase 'wait' works", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - // Test lowercase + // Test that lowercase 'wait' is treated same as 'WAIT' (case insensitive) var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "error", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "wait", 0) }) assert.NotNil(t, client) From 2d39944c230079c26fd05757b9bc400b06f5521e Mon Sep 17 00:00:00 2001 From: seonghyun Date: Sat, 27 Dec 2025 04:42:26 +0900 Subject: [PATCH 08/14] fix: change REDIS_POOL_ON_EMPTY_BEHAVIOR default to WAIT The default value 'CREATE' is not supported in radix v4 and causes integration tests to panic at startup. Changed default to 'WAIT' which matches radix v4's actual pool behavior (always blocks when empty). This fixes integration test failures where tests without explicit REDIS_POOL_ON_EMPTY_BEHAVIOR settings would panic during initialization with: "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4" Also updated documentation to clarify that CREATE/ERROR are not supported and marked RedisPoolOnEmptyWaitDuration as deprecated. Signed-off-by: seonghyun --- src/settings/settings.go | 19 ++++++++++--------- src/settings/settings_test.go | 4 ++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/settings/settings.go b/src/settings/settings.go index b7948dc04..3f8e0cdde 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -188,20 +188,21 @@ type Settings struct { RedisPerSecondTimeout time.Duration `envconfig:"REDIS_PERSECOND_TIMEOUT" default:"10s"` // RedisPoolOnEmptyBehavior controls what happens when Redis connection pool is empty. - // This setting helps prevent connection storms during Redis failures. + // NOTE: In radix v4, the pool ALWAYS blocks when empty (WAIT behavior). // Possible values: - // - "CREATE": Create a new connection after RedisPoolOnEmptyWaitDuration (default) - // - "ERROR": Return error after RedisPoolOnEmptyWaitDuration - // - "WAIT": Block until a connection is available - RedisPoolOnEmptyBehavior string `envconfig:"REDIS_POOL_ON_EMPTY_BEHAVIOR" default:"CREATE"` - // RedisPoolOnEmptyWaitDuration is the wait duration before taking action when pool is empty. - // Only applicable when RedisPoolOnEmptyBehavior is "CREATE" or "ERROR". + // - "WAIT": Block until a connection is available (default, radix v4 behavior) + // - "CREATE": NOT SUPPORTED in radix v4 - will cause panic at startup + // - "ERROR": NOT SUPPORTED in radix v4 - will cause panic at startup + // For fail-fast behavior, use context timeouts when calling Redis operations. + RedisPoolOnEmptyBehavior string `envconfig:"REDIS_POOL_ON_EMPTY_BEHAVIOR" default:"WAIT"` + // RedisPoolOnEmptyWaitDuration is DEPRECATED in radix v4. + // This setting has no effect as radix v4 always blocks until a connection is available. RedisPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"` // RedisPerSecondPoolOnEmptyBehavior controls pool-empty behavior for per-second Redis. // See RedisPoolOnEmptyBehavior for possible values and details. - RedisPerSecondPoolOnEmptyBehavior string `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR" default:"CREATE"` - // RedisPerSecondPoolOnEmptyWaitDuration is the wait duration for per-second Redis pool. + RedisPerSecondPoolOnEmptyBehavior string `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR" default:"WAIT"` + // RedisPerSecondPoolOnEmptyWaitDuration is DEPRECATED in radix v4. // See RedisPoolOnEmptyWaitDuration for details. RedisPerSecondPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"` diff --git a/src/settings/settings_test.go b/src/settings/settings_test.go index e706196ee..f00ba421b 100644 --- a/src/settings/settings_test.go +++ b/src/settings/settings_test.go @@ -21,7 +21,7 @@ func TestRedisPoolOnEmptyBehavior_Default(t *testing.T) { settings := NewSettings() - assert.Equal(t, "CREATE", settings.RedisPoolOnEmptyBehavior) + assert.Equal(t, "WAIT", settings.RedisPoolOnEmptyBehavior) assert.Equal(t, 1*time.Second, settings.RedisPoolOnEmptyWaitDuration) } @@ -88,7 +88,7 @@ func TestRedisPerSecondPoolOnEmptyBehavior_Default(t *testing.T) { settings := NewSettings() - assert.Equal(t, "CREATE", settings.RedisPerSecondPoolOnEmptyBehavior) + assert.Equal(t, "WAIT", settings.RedisPerSecondPoolOnEmptyBehavior) assert.Equal(t, 1*time.Second, settings.RedisPerSecondPoolOnEmptyWaitDuration) } From bd53ae15a508c5cd19922517176198173f83ecc5 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Sat, 27 Dec 2025 13:21:17 +0900 Subject: [PATCH 09/14] test: fix cluster connection timeout and context bug - Fix WaitForTcpPort to use timeoutCtx instead of ctx This ensures the timeout parameter is actually respected when dialing TCP connections. - Increase gRPC server startup timeout from 1s to 10s Radix v4 cluster connection initialization takes longer, especially when establishing connections to multiple cluster nodes. This prevents "connection refused" errors in integration tests. Signed-off-by: seonghyun --- test/common/common.go | 2 +- test/integration/integration_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/test/common/common.go b/test/common/common.go index 0eaaa0598..6a414ca09 100644 --- a/test/common/common.go +++ b/test/common/common.go @@ -105,7 +105,7 @@ func WaitForTcpPort(ctx context.Context, port int, timeout time.Duration) error // Wait up to 1s for the redis instance to start accepting connections. for { var d net.Dialer - conn, err := d.DialContext(ctx, "tcp", "localhost:"+strconv.Itoa(port)) + conn, err := d.DialContext(timeoutCtx, "tcp", "localhost:"+strconv.Itoa(port)) if err == nil { conn.Close() // TCP connections are working. All is well. diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 5c0234d42..df5e6fe6d 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -797,7 +797,8 @@ func startTestRunner(t *testing.T, s settings.Settings) *runner.Runner { }() // HACK: Wait for the server to come up. Make a hook that we can wait on. - common.WaitForTcpPort(context.Background(), s.GrpcPort, 1*time.Second) + // Increased timeout from 1s to 10s to allow for Redis cluster connection initialization + common.WaitForTcpPort(context.Background(), s.GrpcPort, 10*time.Second) return &runner } From 08d3ea16484facfecbdcde5c0d562d2af6595e31 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Sat, 27 Dec 2025 13:39:16 +0900 Subject: [PATCH 10/14] refactor: extract dialer creation logic to reduce duplication Consolidates Redis and Sentinel dialer setup into a reusable createDialer helper function, eliminating ~30 lines of duplicated code. Improves logging by including connection target details (e.g., "sentinel(master,host1,host2)") instead of generic "sentinel" string. Signed-off-by: seonghyun --- src/redis/driver_impl.go | 70 +++++++++++++++------------------------- 1 file changed, 26 insertions(+), 44 deletions(-) diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 4b8f51c15..0bcd57644 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -79,15 +79,9 @@ func checkError(err error) { } } -func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, - pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, - timeout time.Duration, poolOnEmptyBehavior string, poolOnEmptyWaitDuration time.Duration, sentinelAuth string, - useExplicitPipeline bool, -) Client { - maskedUrl := utils.MaskCredentialsInUrl(url) - logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize) - - // Create Dialer for connecting to Redis +// createDialer creates a radix.Dialer with timeout, TLS, and auth configuration +// targetName is used for logging to identify the connection target (e.g., URL, "sentinel(url)") +func createDialer(timeout time.Duration, useTls bool, tlsConfig *tls.Config, auth string, targetName string) radix.Dialer { var netDialer net.Dialer if timeout > 0 { netDialer.Timeout = timeout @@ -104,20 +98,38 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT Config: tlsConfig, } dialer.NetDialer = &tlsNetDialer + if targetName != "" { + logger.Warnf("enabling TLS to redis %s", targetName) + } } + // Setup auth if provided if auth != "" { user, pass, found := strings.Cut(auth, ":") if found { - logger.Warnf("enabling authentication to redis on %s with user %s", maskedUrl, user) + logger.Warnf("enabling authentication to redis %s with user %s", targetName, user) dialer.AuthUser = user dialer.AuthPass = pass } else { - logger.Warnf("enabling authentication to redis on %s without user", maskedUrl) + logger.Warnf("enabling authentication to redis %s without user", targetName) dialer.AuthPass = auth } } + return dialer +} + +func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, + pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, + timeout time.Duration, poolOnEmptyBehavior string, poolOnEmptyWaitDuration time.Duration, sentinelAuth string, + useExplicitPipeline bool, +) Client { + maskedUrl := utils.MaskCredentialsInUrl(url) + logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize) + + // Create Dialer for connecting to Redis + dialer := createDialer(timeout, useTls, tlsConfig, auth, maskedUrl) + stats := newPoolStats(scope) // Create PoolConfig @@ -206,39 +218,9 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT panic(RedisError("Expected master name and a list of urls for the sentinels, in the format: ,,...,")) } - // Create sentinel dialer - var sentinelNetDialer net.Dialer - if timeout > 0 { - sentinelNetDialer.Timeout = timeout - } - - sentinelDialer := radix.Dialer{ - NetDialer: &sentinelNetDialer, - } - - // Setup TLS for sentinel if needed - if useTls { - logger.Warnf("enabling TLS to redis sentinel") - tlsSentinelDialer := tls.Dialer{ - NetDialer: &sentinelNetDialer, - Config: tlsConfig, - } - sentinelDialer.NetDialer = &tlsSentinelDialer - } - - // Use sentinelAuth for authenticating to Sentinel nodes, not auth - // auth is used for Redis master/replica authentication - if sentinelAuth != "" { - user, pass, found := strings.Cut(sentinelAuth, ":") - if found { - logger.Warnf("enabling authentication to redis sentinel with user %s", user) - sentinelDialer.AuthUser = user - sentinelDialer.AuthPass = pass - } else { - logger.Warnf("enabling authentication to redis sentinel without user") - sentinelDialer.AuthPass = sentinelAuth - } - } + // Create sentinel dialer (may use different auth from Redis master/replica) + // sentinelAuth is for Sentinel nodes, auth is for Redis master/replica + sentinelDialer := createDialer(timeout, useTls, tlsConfig, sentinelAuth, fmt.Sprintf("sentinel(%s)", maskedUrl)) sentinelConfig := radix.SentinelConfig{ PoolConfig: poolConfig, From 26c2a990652b37cd064a48bcdd20bfe4a6526473 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Sat, 27 Dec 2025 13:56:44 +0900 Subject: [PATCH 11/14] refactor: remove deprecated REDIS_POOL_ON_EMPTY_WAIT_DURATION settings Remove the deprecated poolOnEmptyWaitDuration parameter and related configuration settings as they have no effect in radix v4. The pool always blocks until a connection is available when using WAIT behavior. Signed-off-by: seonghyun --- src/redis/cache_impl.go | 4 +-- src/redis/driver_impl.go | 2 +- src/settings/settings.go | 6 ---- src/settings/settings_test.go | 32 ---------------------- test/redis/bench_test.go | 2 +- test/redis/driver_impl_test.go | 50 ++++++++++------------------------ 6 files changed, 18 insertions(+), 78 deletions(-) diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index 7812b3eb7..88f3252ef 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -19,13 +19,13 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca if s.RedisPerSecond { perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType, s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout, - s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondPoolOnEmptyWaitDuration, s.RedisPerSecondSentinelAuth, s.RedisPerSecondUseExplicitPipeline) + s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondSentinelAuth, s.RedisPerSecondUseExplicitPipeline) closer.Closers = append(closer.Closers, perSecondPool) } otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize, s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout, - s.RedisPoolOnEmptyBehavior, s.RedisPoolOnEmptyWaitDuration, s.RedisSentinelAuth, s.RedisUseExplicitPipeline) + s.RedisPoolOnEmptyBehavior, s.RedisSentinelAuth, s.RedisUseExplicitPipeline) closer.Closers = append(closer.Closers, otherPool) return NewFixedRateLimitCacheImpl( diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 0bcd57644..1ed7350ab 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -121,7 +121,7 @@ func createDialer(timeout time.Duration, useTls bool, tlsConfig *tls.Config, aut func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, - timeout time.Duration, poolOnEmptyBehavior string, poolOnEmptyWaitDuration time.Duration, sentinelAuth string, + timeout time.Duration, poolOnEmptyBehavior string, sentinelAuth string, useExplicitPipeline bool, ) Client { maskedUrl := utils.MaskCredentialsInUrl(url) diff --git a/src/settings/settings.go b/src/settings/settings.go index 3f8e0cdde..6a0af70f9 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -195,16 +195,10 @@ type Settings struct { // - "ERROR": NOT SUPPORTED in radix v4 - will cause panic at startup // For fail-fast behavior, use context timeouts when calling Redis operations. RedisPoolOnEmptyBehavior string `envconfig:"REDIS_POOL_ON_EMPTY_BEHAVIOR" default:"WAIT"` - // RedisPoolOnEmptyWaitDuration is DEPRECATED in radix v4. - // This setting has no effect as radix v4 always blocks until a connection is available. - RedisPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"` // RedisPerSecondPoolOnEmptyBehavior controls pool-empty behavior for per-second Redis. // See RedisPoolOnEmptyBehavior for possible values and details. RedisPerSecondPoolOnEmptyBehavior string `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR" default:"WAIT"` - // RedisPerSecondPoolOnEmptyWaitDuration is DEPRECATED in radix v4. - // See RedisPoolOnEmptyWaitDuration for details. - RedisPerSecondPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"` // Memcache settings MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""` diff --git a/src/settings/settings_test.go b/src/settings/settings_test.go index f00ba421b..b2c7bc7eb 100644 --- a/src/settings/settings_test.go +++ b/src/settings/settings_test.go @@ -3,7 +3,6 @@ package settings import ( "os" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -17,48 +16,28 @@ func TestSettingsTlsConfigUnmodified(t *testing.T) { // Tests for RedisPoolOnEmptyBehavior func TestRedisPoolOnEmptyBehavior_Default(t *testing.T) { os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() assert.Equal(t, "WAIT", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, 1*time.Second, settings.RedisPoolOnEmptyWaitDuration) } func TestRedisPoolOnEmptyBehavior_Error(t *testing.T) { os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR") - os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "0") defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, time.Duration(0), settings.RedisPoolOnEmptyWaitDuration) -} - -func TestRedisPoolOnEmptyBehavior_ErrorWithDuration(t *testing.T) { - os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR") - os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "100ms") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") - - settings := NewSettings() - - assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, 100*time.Millisecond, settings.RedisPoolOnEmptyWaitDuration) } func TestRedisPoolOnEmptyBehavior_Create(t *testing.T) { os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "CREATE") - os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "500ms") defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() assert.Equal(t, "CREATE", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, 500*time.Millisecond, settings.RedisPoolOnEmptyWaitDuration) } func TestRedisPoolOnEmptyBehavior_Wait(t *testing.T) { @@ -84,44 +63,33 @@ func TestRedisPoolOnEmptyBehavior_CaseInsensitive(t *testing.T) { // Tests for RedisPerSecondPoolOnEmptyBehavior func TestRedisPerSecondPoolOnEmptyBehavior_Default(t *testing.T) { os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR") - os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() assert.Equal(t, "WAIT", settings.RedisPerSecondPoolOnEmptyBehavior) - assert.Equal(t, 1*time.Second, settings.RedisPerSecondPoolOnEmptyWaitDuration) } func TestRedisPerSecondPoolOnEmptyBehavior_Error(t *testing.T) { os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR", "ERROR") - os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION", "50ms") defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() assert.Equal(t, "ERROR", settings.RedisPerSecondPoolOnEmptyBehavior) - assert.Equal(t, 50*time.Millisecond, settings.RedisPerSecondPoolOnEmptyWaitDuration) } // Test both pools can be configured independently func TestRedisPoolOnEmptyBehavior_IndependentConfiguration(t *testing.T) { os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR") - os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "0") os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR", "CREATE") - os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION", "100ms") defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() // Main pool configured for fail-fast assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, time.Duration(0), settings.RedisPoolOnEmptyWaitDuration) // Per-second pool configured differently assert.Equal(t, "CREATE", settings.RedisPerSecondPoolOnEmptyBehavior) - assert.Equal(t, 100*time.Millisecond, settings.RedisPerSecondPoolOnEmptyWaitDuration) } diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go index b55474c94..e13266093 100644 --- a/test/redis/bench_test.go +++ b/test/redis/bench_test.go @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) { return func(b *testing.B) { statsStore := gostats.NewStore(gostats.NewNullSink(), false) sm := stats.NewMockStatManager(statsStore) - client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "", false) + client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "", false) defer client.Close() cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true) diff --git a/test/redis/driver_impl_test.go b/test/redis/driver_impl_test.go index c40c8b64d..31ca68c4f 100644 --- a/test/redis/driver_impl_test.go +++ b/test/redis/driver_impl_test.go @@ -39,7 +39,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(auth, addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "", useExplicitPipeline) + return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "", useExplicitPipeline) } t.Run("connection refused", func(t *testing.T) { @@ -128,7 +128,7 @@ func TestDoCmd(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, "", 0, "", false) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, "", "", false) } t.Run("SETGET ok", func(t *testing.T) { @@ -173,7 +173,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int, u statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "", useExplicitPipeline) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "", useExplicitPipeline) } t.Run("SETGET ok", func(t *testing.T) { @@ -240,8 +240,8 @@ func TestPoolOnEmptyBehavior(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) // Helper to create client with specific on-empty behavior - mkRedisClientWithBehavior := func(addr, behavior string, waitDuration time.Duration) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, behavior, waitDuration, "", false) + mkRedisClientWithBehavior := func(addr, behavior string) redis.Client { + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, behavior, "", false) } t.Run("default behavior (empty string)", func(t *testing.T) { @@ -250,7 +250,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "") }) assert.NotNil(t, client) @@ -267,18 +267,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // radix v4 does not support ERROR behavior - should panic at startup panicErr := expectPanicError(t, func() { - mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR", 0) - }) - assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4") - }) - - t.Run("ERROR behavior with wait duration should panic", func(t *testing.T) { - redisSrv := mustNewRedisServer() - defer redisSrv.Close() - - // radix v4 does not support ERROR behavior - should panic at startup - panicErr := expectPanicError(t, func() { - mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR", 100*time.Millisecond) + mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR") }) assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4") }) @@ -289,18 +278,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // radix v4 does not support CREATE behavior - should panic at startup panicErr := expectPanicError(t, func() { - mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE", 0) - }) - assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4") - }) - - t.Run("CREATE behavior with wait duration should panic", func(t *testing.T) { - redisSrv := mustNewRedisServer() - defer redisSrv.Close() - - // radix v4 does not support CREATE behavior - should panic at startup - panicErr := expectPanicError(t, func() { - mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE", 500*time.Millisecond) + mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE") }) assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4") }) @@ -311,7 +289,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "WAIT", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "WAIT") }) assert.NotNil(t, client) @@ -328,7 +306,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // Test that lowercase 'error' is treated same as 'ERROR' (case insensitive) panicErr := expectPanicError(t, func() { - mkRedisClientWithBehavior(redisSrv.Addr(), "error", 0) + mkRedisClientWithBehavior(redisSrv.Addr(), "error") }) assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4") }) @@ -339,7 +317,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // Test that lowercase 'create' is treated same as 'CREATE' (case insensitive) panicErr := expectPanicError(t, func() { - mkRedisClientWithBehavior(redisSrv.Addr(), "create", 0) + mkRedisClientWithBehavior(redisSrv.Addr(), "create") }) assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4") }) @@ -351,7 +329,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // Test that lowercase 'wait' is treated same as 'WAIT' (case insensitive) var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "wait", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "wait") }) assert.NotNil(t, client) @@ -369,7 +347,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // Unknown behavior should not panic, just log warning and use default var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "UNKNOWN_BEHAVIOR", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "UNKNOWN_BEHAVIOR") }) assert.NotNil(t, client) @@ -387,7 +365,7 @@ func TestNewClientImplSentinel(t *testing.T) { mkSentinelClient := func(auth, sentinelAuth, url string, useTls bool, timeout time.Duration) redis.Client { // Pass nil for tlsConfig - we can't test TLS without a real TLS server, // but we can verify the code path is executed (logs will show TLS is enabled) - return redis.NewClientImpl(statsStore, useTls, auth, "tcp", "sentinel", url, 1, 0, 0, nil, false, nil, timeout, "", 0, sentinelAuth, false) + return redis.NewClientImpl(statsStore, useTls, auth, "tcp", "sentinel", url, 1, 0, 0, nil, false, nil, timeout, "", sentinelAuth, false) } t.Run("invalid url format - missing sentinel addresses", func(t *testing.T) { From dbd72e1ae423d3d1e750d7ff4b12e7cf907d5ec1 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Sat, 27 Dec 2025 14:59:02 +0900 Subject: [PATCH 12/14] refactor: auto-select pipeline mode based on Redis type Remove REDIS_USE_EXPLICIT_PIPELINE configuration option and automatically determine pipeline mode based on Redis deployment type: - Cluster mode: uses grouped pipeline (groups same-key commands) - INCRBY + EXPIRE for same key are pipelined together (same slot) - Reduces round-trips from 2 to 1 per key in cluster mode - Single/Sentinel mode: uses explicit pipeline (batches all commands) - All commands in one pipeline for minimal latency - Optimal for non-cluster deployments This simplifies configuration by removing user-facing options while automatically choosing the optimal pipeline strategy for each Redis type. Breaking changes: - Remove REDIS_USE_EXPLICIT_PIPELINE env var - Remove REDIS_PERSECOND_USE_EXPLICIT_PIPELINE env var - Remove UseExplicitPipeline() interface method Signed-off-by: seonghyun --- src/redis/cache_impl.go | 4 +- src/redis/driver.go | 12 ++-- src/redis/driver_impl.go | 102 +++++++++++++++++----------- src/settings/settings.go | 14 +--- test/mocks/redis/redis.go | 14 ---- test/redis/bench_test.go | 2 +- test/redis/driver_impl_test.go | 31 +++------ test/redis/fixed_cache_impl_test.go | 5 +- 8 files changed, 90 insertions(+), 94 deletions(-) diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index 88f3252ef..21db8b6d2 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -19,13 +19,13 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca if s.RedisPerSecond { perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType, s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout, - s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondSentinelAuth, s.RedisPerSecondUseExplicitPipeline) + s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondSentinelAuth) closer.Closers = append(closer.Closers, perSecondPool) } otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize, s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout, - s.RedisPoolOnEmptyBehavior, s.RedisSentinelAuth, s.RedisUseExplicitPipeline) + s.RedisPoolOnEmptyBehavior, s.RedisSentinelAuth) closer.Closers = append(closer.Closers, otherPool) return NewFixedRateLimitCacheImpl( diff --git a/src/redis/driver.go b/src/redis/driver.go index 5063956bf..8f52df42d 100644 --- a/src/redis/driver.go +++ b/src/redis/driver.go @@ -41,11 +41,13 @@ type Client interface { // NumActiveConns return number of active connections, used in testing. NumActiveConns() int +} - // UseExplicitPipeline returns true if explicit pipelining should be used. - // When false, individual commands are executed and radix v4 automatically buffers writes. - // When true, commands are batched using radix.NewPipeline(). - UseExplicitPipeline() bool +// PipelineAction represents a single action in the pipeline along with its key. +// The key is used for grouping commands in cluster mode. +type PipelineAction struct { + Action radix.Action + Key string } -type Pipeline []radix.Action +type Pipeline []PipelineAction diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 1ed7350ab..50cf44667 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -68,9 +68,9 @@ type redisClient interface { } type clientImpl struct { - client redisClient - stats poolStats - useExplicitPipeline bool + client redisClient + stats poolStats + isCluster bool } func checkError(err error) { @@ -122,7 +122,6 @@ func createDialer(timeout time.Duration, useTls bool, tlsConfig *tls.Config, aut func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, timeout time.Duration, poolOnEmptyBehavior string, sentinelAuth string, - useExplicitPipeline bool, ) Client { maskedUrl := utils.MaskCredentialsInUrl(url) logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize) @@ -139,24 +138,22 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT Trace: poolTrace(&stats, healthCheckActiveConnection, srv), } - // Note: radix v4 handles write buffering via WriteFlushInterval. - // Explicit pipelining (radix.NewPipeline()) is only used when explicitly requested via useExplicitPipeline parameter. - // Otherwise, individual Do() calls are used with automatic write buffering via WriteFlushInterval. - // pipelineLimit parameter is deprecated and ignored in radix v4. + // Determine pipeline mode based on Redis type: + // - Cluster: uses grouped pipeline (same-key commands batched together) + // - Single/Sentinel: uses explicit pipeline (all commands batched together) + isCluster := strings.ToLower(redisType) == "cluster" - // Warn if deprecated pipelineLimit is set + // pipelineLimit parameter is deprecated and ignored in radix v4. if pipelineLimit > 0 { logger.Warnf("REDIS_PIPELINE_LIMIT=%d is deprecated and has no effect in radix v4. Write buffering is controlled solely by REDIS_PIPELINE_WINDOW.", pipelineLimit) } - // Set WriteFlushInterval for automatic write buffering when not using explicit pipeline - if !useExplicitPipeline && pipelineWindow > 0 { + // Set WriteFlushInterval for cluster mode (grouped pipeline uses auto buffering) + if isCluster && pipelineWindow > 0 { poolConfig.Dialer.WriteFlushInterval = pipelineWindow - logger.Debugf("Setting WriteFlushInterval to %v", pipelineWindow) + logger.Debugf("Cluster mode: setting WriteFlushInterval to %v", pipelineWindow) } - logger.Debugf("Use explicit pipeline: %v", useExplicitPipeline) - // IMPORTANT: radix v4 pool behavior changes from v3 // // v4 uses a FIXED pool size and BLOCKS when all connections are in use. @@ -204,9 +201,6 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT client, err = poolFunc(ctx, redisSocketType, url) case "cluster": urls := strings.Split(url, ",") - if useExplicitPipeline { - panic(RedisError("Explicit pipelining cannot be used with Redis Cluster Mode. Set REDIS_PIPELINE_WINDOW to a non-zero value (e.g., 150us)")) - } logger.Warnf("Creating cluster with urls %v", urls) clusterConfig := radix.ClusterConfig{ PoolConfig: poolConfig, @@ -241,9 +235,9 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT } return &clientImpl{ - client: client, - stats: stats, - useExplicitPipeline: useExplicitPipeline, + client: client, + stats: stats, + isCluster: isCluster, } } @@ -269,33 +263,63 @@ func (c *clientImpl) PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key str allArgs := make([]interface{}, 0, 1+len(args)) allArgs = append(allArgs, key) allArgs = append(allArgs, args...) - return append(pipeline, radix.FlatCmd(rcv, cmd, allArgs...)) + return append(pipeline, PipelineAction{ + Action: radix.FlatCmd(rcv, cmd, allArgs...), + Key: key, + }) } func (c *clientImpl) PipeDo(pipeline Pipeline) error { ctx := context.Background() - if c.useExplicitPipeline { - // When explicit pipelining is enabled (WriteFlushInterval == 0): - // Use radix.NewPipeline() to batch commands together. - p := radix.NewPipeline() - for _, action := range pipeline { - p.Append(action) + if c.isCluster { + // Cluster mode: group commands by key and execute each group as a pipeline. + // This ensures INCRBY + EXPIRE for the same key are pipelined together (same slot), + // reducing round-trips from 2 to 1 per key. + return c.executeGroupedPipeline(ctx, pipeline) + } + + // Single/Sentinel mode: batch all commands in a single pipeline. + p := radix.NewPipeline() + for _, pipelineAction := range pipeline { + p.Append(pipelineAction.Action) + } + return c.client.Do(ctx, p) +} + +// executeGroupedPipeline groups pipeline actions by key and executes each group +// as a separate pipeline. This allows same-key commands (like INCRBY + EXPIRE) +// to be pipelined together even in cluster mode. +func (c *clientImpl) executeGroupedPipeline(ctx context.Context, pipeline Pipeline) error { + // Group actions by key, preserving first-occurrence order + var groups [][]radix.Action + keyToIndex := make(map[string]int) + + for _, pa := range pipeline { + if idx, exists := keyToIndex[pa.Key]; exists { + groups[idx] = append(groups[idx], pa.Action) + } else { + keyToIndex[pa.Key] = len(groups) + groups = append(groups, []radix.Action{pa.Action}) } - return c.client.Do(ctx, p) } - // When automatic buffering is enabled (WriteFlushInterval > 0): - // Execute each action individually. Radix v4 will automatically buffer - // concurrent writes and flush them together based on WriteFlushInterval. - // This provides better performance for most workloads. - for _, action := range pipeline { - if err := c.client.Do(ctx, action); err != nil { - return err + // Execute each group + for _, actions := range groups { + if len(actions) == 1 { + if err := c.client.Do(ctx, actions[0]); err != nil { + return err + } + } else { + // Multiple commands for same key: pipeline them together + p := radix.NewPipeline() + for _, action := range actions { + p.Append(action) + } + if err := c.client.Do(ctx, p); err != nil { + return err + } } } - return nil -} -func (c *clientImpl) UseExplicitPipeline() bool { - return c.useExplicitPipeline + return nil } diff --git a/src/settings/settings.go b/src/settings/settings.go index 6a0af70f9..e3629f327 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -148,15 +148,8 @@ type Settings struct { // RedisPipelineLimit is DEPRECATED and unused in radix v4. // This setting has no effect. Radix v4 does not support explicit pipeline size limits. // Write buffering is controlled solely by RedisPipelineWindow (WriteFlushInterval). - RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` - // RedisUseExplicitPipeline controls whether to use explicit pipelining (radix.NewPipeline()). - // When true, commands are batched using radix.NewPipeline() and sent together. - // When false (default), individual commands are sent with automatic write buffering via WriteFlushInterval. - // IMPORTANT: Explicit pipelining CANNOT be used with Redis Cluster mode. - // For cluster mode, you MUST use automatic write buffering (set this to false and use RedisPipelineWindow). - // Only set this to true for single/sentinel mode when you specifically need explicit pipeline control. - RedisUseExplicitPipeline bool `envconfig:"REDIS_USE_EXPLICIT_PIPELINE" default:"false"` - RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` + RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` + RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` RedisPerSecondType string `envconfig:"REDIS_PERSECOND_TYPE" default:"SINGLE"` RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"` @@ -177,9 +170,6 @@ type Settings struct { // RedisPerSecondPipelineLimit is DEPRECATED and unused in radix v4. // See comments of RedisPipelineLimit for details. RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"` - // RedisPerSecondUseExplicitPipeline controls explicit pipelining for per-second redis. - // See comments of RedisUseExplicitPipeline for details. - RedisPerSecondUseExplicitPipeline bool `envconfig:"REDIS_PERSECOND_USE_EXPLICIT_PIPELINE" default:"false"` // Enable healthcheck to check Redis Connection. If there is no active connection, healthcheck failed. RedisHealthCheckActiveConnection bool `envconfig:"REDIS_HEALTH_CHECK_ACTIVE_CONNECTION" default:"false"` // RedisTimeout sets the timeout for Redis connection and I/O operations. diff --git a/test/mocks/redis/redis.go b/test/mocks/redis/redis.go index 02f0f258a..192b41c26 100644 --- a/test/mocks/redis/redis.go +++ b/test/mocks/redis/redis.go @@ -68,20 +68,6 @@ func (mr *MockClientMockRecorder) DoCmd(arg0, arg1, arg2 interface{}, arg3 ...in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoCmd", reflect.TypeOf((*MockClient)(nil).DoCmd), varargs...) } -// UseExplicitPipeline mocks base method -func (m *MockClient) UseExplicitPipeline() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UseExplicitPipeline") - ret0, _ := ret[0].(bool) - return ret0 -} - -// UseExplicitPipeline indicates an expected call of UseExplicitPipeline -func (mr *MockClientMockRecorder) UseExplicitPipeline() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseExplicitPipeline", reflect.TypeOf((*MockClient)(nil).UseExplicitPipeline)) -} - // NumActiveConns mocks base method func (m *MockClient) NumActiveConns() int { m.ctrl.T.Helper() diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go index e13266093..3b1d5fa49 100644 --- a/test/redis/bench_test.go +++ b/test/redis/bench_test.go @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) { return func(b *testing.B) { statsStore := gostats.NewStore(gostats.NewNullSink(), false) sm := stats.NewMockStatManager(statsStore) - client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "", false) + client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "") defer client.Close() cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true) diff --git a/test/redis/driver_impl_test.go b/test/redis/driver_impl_test.go index 31ca68c4f..8d1b30132 100644 --- a/test/redis/driver_impl_test.go +++ b/test/redis/driver_impl_test.go @@ -33,13 +33,13 @@ func expectPanicError(t *testing.T, f assert.PanicTestFunc) (result error) { return } -func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit int, useExplicitPipeline bool) func(t *testing.T) { +func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) func(t *testing.T) { return func(t *testing.T) { redisAuth := "123" statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(auth, addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "", useExplicitPipeline) + return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "") } t.Run("connection refused", func(t *testing.T) { @@ -107,28 +107,19 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit mkRedisClient(redisAuth, redisSrv.Addr()) }) }) - - t.Run("UseExplicitPipeline() return expected value", func(t *testing.T) { - redisSrv := mustNewRedisServer() - defer redisSrv.Close() - - client := mkRedisClient("", redisSrv.Addr()) - - assert.Equal(t, useExplicitPipeline, client.UseExplicitPipeline()) - }) } } func TestNewClientImpl(t *testing.T) { - t.Run("AutoBuffering", testNewClientImpl(t, 2*time.Millisecond, 2, false)) - t.Run("ExplicitPipeline", testNewClientImpl(t, 0, 0, true)) + t.Run("WithPipelineWindow", testNewClientImpl(t, 2*time.Millisecond, 2)) + t.Run("WithoutPipelineWindow", testNewClientImpl(t, 0, 0)) } func TestDoCmd(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, "", "", false) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, "", "") } t.Run("SETGET ok", func(t *testing.T) { @@ -168,12 +159,12 @@ func TestDoCmd(t *testing.T) { }) } -func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int, useExplicitPipeline bool) func(t *testing.T) { +func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) func(t *testing.T) { return func(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "", useExplicitPipeline) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "") } t.Run("SETGET ok", func(t *testing.T) { @@ -231,8 +222,8 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int, u } func TestPipeDo(t *testing.T) { - t.Run("AutoBuffering", testPipeDo(t, 10*time.Millisecond, 2, false)) - t.Run("ExplicitPipeline", testPipeDo(t, 0, 0, true)) + t.Run("WithPipelineWindow", testPipeDo(t, 10*time.Millisecond, 2)) + t.Run("WithoutPipelineWindow", testPipeDo(t, 0, 0)) } // Tests for pool on-empty behavior @@ -241,7 +232,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // Helper to create client with specific on-empty behavior mkRedisClientWithBehavior := func(addr, behavior string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, behavior, "", false) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, behavior, "") } t.Run("default behavior (empty string)", func(t *testing.T) { @@ -365,7 +356,7 @@ func TestNewClientImplSentinel(t *testing.T) { mkSentinelClient := func(auth, sentinelAuth, url string, useTls bool, timeout time.Duration) redis.Client { // Pass nil for tlsConfig - we can't test TLS without a real TLS server, // but we can verify the code path is executed (logs will show TLS is enabled) - return redis.NewClientImpl(statsStore, useTls, auth, "tcp", "sentinel", url, 1, 0, 0, nil, false, nil, timeout, "", sentinelAuth, false) + return redis.NewClientImpl(statsStore, useTls, auth, "tcp", "sentinel", url, 1, 0, 0, nil, false, nil, timeout, "", sentinelAuth) } t.Run("invalid url format - missing sentinel addresses", func(t *testing.T) { diff --git a/test/redis/fixed_cache_impl_test.go b/test/redis/fixed_cache_impl_test.go index b1a8088cd..73229ab6e 100644 --- a/test/redis/fixed_cache_impl_test.go +++ b/test/redis/fixed_cache_impl_test.go @@ -35,7 +35,10 @@ func TestRedis(t *testing.T) { } func pipeAppend(pipeline redis.Pipeline, rcv interface{}, cmd, key string, args ...interface{}) redis.Pipeline { - return append(pipeline, radix.FlatCmd(rcv, cmd, append([]interface{}{key}, args...)...)) + return append(pipeline, redis.PipelineAction{ + Action: radix.FlatCmd(rcv, cmd, append([]interface{}{key}, args...)...), + Key: key, + }) } func testRedis(usePerSecondRedis bool) func(*testing.T) { From 478f5d63837300a06242246fb37a9adad54b1a5b Mon Sep 17 00:00:00 2001 From: seonghyun Date: Sun, 28 Dec 2025 00:30:46 +0900 Subject: [PATCH 13/14] docs: remove non-existent REDIS_USE_EXPLICIT_PIPELINE from README The REDIS_USE_EXPLICIT_PIPELINE and REDIS_PERSECOND_USE_EXPLICIT_PIPELINE settings were documented in README but do not exist in settings.go. Removed the documentation to match the actual implementation. Signed-off-by: seonghyun --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index b9c099a21..f2d9291d3 100644 --- a/README.md +++ b/README.md @@ -1312,7 +1312,6 @@ For high throughput scenarios, ratelimit supports write buffering via [radix v4' 1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: controls how often buffered writes are flushed to the network connection. When set to a non-zero value (e.g., 150us-500us), radix v4 will buffer multiple concurrent write operations and flush them together, reducing system calls and improving throughput. If zero, each write is flushed immediately. **Required for Redis Cluster mode.** 1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: **DEPRECATED** - These settings have no effect in radix v4. Write buffering is controlled solely by the window settings above. -1. `REDIS_USE_EXPLICIT_PIPELINE` & `REDIS_PERSECOND_USE_EXPLICIT_PIPELINE`: controls whether to use explicit pipelining with `radix.NewPipeline()`. When `true`, commands are batched together and sent as a single pipeline request. When `false` (default), individual commands are sent with automatic write buffering via `WriteFlushInterval`. **IMPORTANT**: Explicit pipelining **CANNOT** be used with Redis Cluster mode. For cluster mode, this must be set to `false` and use `REDIS_PIPELINE_WINDOW` for write buffering instead. Only use this for single/sentinel mode when you specifically need explicit pipeline control. Default: `false` Write buffering is disabled by default (window = 0). For optimal performance, set `REDIS_PIPELINE_WINDOW` to 150us-500us depending on your latency requirements and load patterns. From 49040e2bea8b21a7dda9e3c5c37ff560e2d2e4b2 Mon Sep 17 00:00:00 2001 From: seonghyun Date: Sun, 28 Dec 2025 00:55:01 +0900 Subject: [PATCH 14/14] style: fix gofmt formatting in settings.go Signed-off-by: seonghyun --- src/settings/settings.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/settings/settings.go b/src/settings/settings.go index e3629f327..64cc7d928 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -148,8 +148,8 @@ type Settings struct { // RedisPipelineLimit is DEPRECATED and unused in radix v4. // This setting has no effect. Radix v4 does not support explicit pipeline size limits. // Write buffering is controlled solely by RedisPipelineWindow (WriteFlushInterval). - RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` - RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` + RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` + RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` RedisPerSecondType string `envconfig:"REDIS_PERSECOND_TYPE" default:"SINGLE"` RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"`