diff --git a/README.md b/README.md index 09d392e46..f2d9291d3 100644 --- a/README.md +++ b/README.md @@ -1308,14 +1308,12 @@ Controls what happens when all connections in the pool are in use and a new requ By default, for each request, ratelimit will pick up a connection from pool, write multiple redis commands in a single write then reads their responses in a single read. This reduces network delay. -For high throughput scenarios, ratelimit also support [implicit pipelining](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L238) . It can be configured using the following environment variables: +For high throughput scenarios, ratelimit supports write buffering via [radix v4's WriteFlushInterval](https://pkg.go.dev/github.com/mediocregopher/radix/v4#Dialer). It can be configured using the following environment variables: -1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: sets the duration after which internal pipelines will be flushed. - If window is zero then implicit pipelining will be disabled. -1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: sets maximum number of commands that can be pipelined before flushing. - If limit is zero then no limit will be used and pipelines will only be limited by the specified time window. +1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: controls how often buffered writes are flushed to the network connection. When set to a non-zero value (e.g., 150us-500us), radix v4 will buffer multiple concurrent write operations and flush them together, reducing system calls and improving throughput. If zero, each write is flushed immediately. **Required for Redis Cluster mode.** +1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: **DEPRECATED** - These settings have no effect in radix v4. Write buffering is controlled solely by the window settings above. -`implicit pipelining` is disabled by default. To enable it, you can use default values [used by radix](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L278) and tune for the optimal value. +Write buffering is disabled by default (window = 0). For optimal performance, set `REDIS_PIPELINE_WINDOW` to 150us-500us depending on your latency requirements and load patterns. ## One Redis Instance diff --git a/go.mod b/go.mod index 2c9cf881e..ef8fe331b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/alicebob/miniredis/v2 v2.33.0 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 + github.com/cespare/xxhash/v2 v2.3.0 github.com/coocood/freecache v1.2.4 github.com/envoyproxy/go-control-plane v0.13.4 github.com/envoyproxy/go-control-plane/envoy v1.32.4 @@ -21,7 +22,7 @@ require ( github.com/libp2p/go-reuseport v0.4.0 github.com/lyft/goruntime v0.3.0 github.com/lyft/gostats v0.4.14 - github.com/mediocregopher/radix/v3 v3.8.1 + github.com/mediocregopher/radix/v4 v4.1.4 github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/statsd_exporter v0.26.1 @@ -46,7 +47,6 @@ require ( github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect @@ -60,13 +60,13 @@ require ( github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/tilinna/clock v1.0.2 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel/metric v1.36.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/sys v0.34.0 // indirect golang.org/x/text v0.27.0 // indirect - golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250728155136-f173205681a0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 16d1824cc..88c160e27 100644 --- a/go.sum +++ b/go.sum @@ -102,8 +102,8 @@ github.com/lyft/goruntime v0.3.0/go.mod h1:BW1gngSpMJR9P9w23BPUPdhdbUWhpirl98TQh github.com/lyft/gostats v0.4.1/go.mod h1:Tpx2xRzz4t+T2Tx0xdVgIoBdR2UMVz+dKnE3X01XSd8= github.com/lyft/gostats v0.4.14 h1:xmP4yMfDvEKtlNZEcS2sYz0cvnps1ri337ZEEbw3ab8= github.com/lyft/gostats v0.4.14/go.mod h1:cJWqEVL8JIewIJz/olUIios2F1q06Nc51hXejPQmBH0= -github.com/mediocregopher/radix/v3 v3.8.1 h1:rOkHflVuulFKlwsLY01/M2cM2tWCjDoETcMqKbAWu1M= -github.com/mediocregopher/radix/v3 v3.8.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= +github.com/mediocregopher/radix/v4 v4.1.4 h1:Uze6DEbEAvL+VHXUEu/EDBTkUk5CLct5h3nVSGpc6Ts= +github.com/mediocregopher/radix/v4 v4.1.4/go.mod h1:ajchozX/6ELmydxWeWM6xCFHVpZ4+67LXHOTOVR0nCE= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -144,6 +144,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI= +github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -241,8 +243,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= -golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index c411af2ec..21db8b6d2 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -19,13 +19,13 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca if s.RedisPerSecond { perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType, s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout, - s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondPoolOnEmptyWaitDuration, s.RedisPerSecondSentinelAuth) + s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondSentinelAuth) closer.Closers = append(closer.Closers, perSecondPool) } otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize, s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout, - s.RedisPoolOnEmptyBehavior, s.RedisPoolOnEmptyWaitDuration, s.RedisSentinelAuth) + s.RedisPoolOnEmptyBehavior, s.RedisSentinelAuth) closer.Closers = append(closer.Closers, otherPool) return NewFixedRateLimitCacheImpl( diff --git a/src/redis/driver.go b/src/redis/driver.go index 7ffc0c7b7..8f52df42d 100644 --- a/src/redis/driver.go +++ b/src/redis/driver.go @@ -1,6 +1,6 @@ package redis -import "github.com/mediocregopher/radix/v3" +import "github.com/mediocregopher/radix/v4" // Errors that may be raised during config parsing. type RedisError string @@ -41,9 +41,13 @@ type Client interface { // NumActiveConns return number of active connections, used in testing. NumActiveConns() int +} - // ImplicitPipeliningEnabled return true if implicit pipelining is enabled. - ImplicitPipeliningEnabled() bool +// PipelineAction represents a single action in the pipeline along with its key. +// The key is used for grouping commands in cluster mode. +type PipelineAction struct { + Action radix.Action + Key string } -type Pipeline []radix.CmdAction +type Pipeline []PipelineAction diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 8201aa895..50cf44667 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -1,14 +1,16 @@ package redis import ( + "context" "crypto/tls" "fmt" + "net" "strings" "time" stats "github.com/lyft/gostats" - "github.com/mediocregopher/radix/v3" - "github.com/mediocregopher/radix/v3/trace" + "github.com/mediocregopher/radix/v4" + "github.com/mediocregopher/radix/v4/trace" logger "github.com/sirupsen/logrus" "github.com/envoyproxy/ratelimit/src/server" @@ -58,10 +60,17 @@ func poolTrace(ps *poolStats, healthCheckActiveConnection bool, srv server.Serve } } +// redisClient is an interface that abstracts radix Client, Cluster, and Sentinel +// All of these types have Do(context.Context, Action) and Close() methods +type redisClient interface { + Do(context.Context, radix.Action) error + Close() error +} + type clientImpl struct { - client radix.Client - stats poolStats - implicitPipelining bool + client redisClient + stats poolStats + isCluster bool } func checkError(err error) { @@ -70,107 +79,148 @@ func checkError(err error) { } } +// createDialer creates a radix.Dialer with timeout, TLS, and auth configuration +// targetName is used for logging to identify the connection target (e.g., URL, "sentinel(url)") +func createDialer(timeout time.Duration, useTls bool, tlsConfig *tls.Config, auth string, targetName string) radix.Dialer { + var netDialer net.Dialer + if timeout > 0 { + netDialer.Timeout = timeout + } + + dialer := radix.Dialer{ + NetDialer: &netDialer, + } + + // Setup TLS if needed + if useTls { + tlsNetDialer := tls.Dialer{ + NetDialer: &netDialer, + Config: tlsConfig, + } + dialer.NetDialer = &tlsNetDialer + if targetName != "" { + logger.Warnf("enabling TLS to redis %s", targetName) + } + } + + // Setup auth if provided + if auth != "" { + user, pass, found := strings.Cut(auth, ":") + if found { + logger.Warnf("enabling authentication to redis %s with user %s", targetName, user) + dialer.AuthUser = user + dialer.AuthPass = pass + } else { + logger.Warnf("enabling authentication to redis %s without user", targetName) + dialer.AuthPass = auth + } + } + + return dialer +} + func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, - timeout time.Duration, poolOnEmptyBehavior string, poolOnEmptyWaitDuration time.Duration, sentinelAuth string, + timeout time.Duration, poolOnEmptyBehavior string, sentinelAuth string, ) Client { maskedUrl := utils.MaskCredentialsInUrl(url) logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize) - df := func(network, addr string) (radix.Conn, error) { - var dialOpts []radix.DialOpt - - dialOpts = append(dialOpts, radix.DialTimeout(timeout)) - - if useTls { - dialOpts = append(dialOpts, radix.DialUseTLS(tlsConfig)) - } + // Create Dialer for connecting to Redis + dialer := createDialer(timeout, useTls, tlsConfig, auth, maskedUrl) - if auth != "" { - user, pass, found := strings.Cut(auth, ":") - if found { - logger.Warnf("enabling authentication to redis on %s with user %s", maskedUrl, user) - dialOpts = append(dialOpts, radix.DialAuthUser(user, pass)) - } else { - logger.Warnf("enabling authentication to redis on %s without user", maskedUrl) - dialOpts = append(dialOpts, radix.DialAuthPass(auth)) - } - } + stats := newPoolStats(scope) - return radix.Dial(network, addr, dialOpts...) + // Create PoolConfig + poolConfig := radix.PoolConfig{ + Dialer: dialer, + Size: poolSize, + Trace: poolTrace(&stats, healthCheckActiveConnection, srv), } - stats := newPoolStats(scope) + // Determine pipeline mode based on Redis type: + // - Cluster: uses grouped pipeline (same-key commands batched together) + // - Single/Sentinel: uses explicit pipeline (all commands batched together) + isCluster := strings.ToLower(redisType) == "cluster" - opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats, healthCheckActiveConnection, srv))} + // pipelineLimit parameter is deprecated and ignored in radix v4. + if pipelineLimit > 0 { + logger.Warnf("REDIS_PIPELINE_LIMIT=%d is deprecated and has no effect in radix v4. Write buffering is controlled solely by REDIS_PIPELINE_WINDOW.", pipelineLimit) + } - implicitPipelining := true - if pipelineWindow == 0 && pipelineLimit == 0 { - implicitPipelining = false - } else { - opts = append(opts, radix.PoolPipelineWindow(pipelineWindow, pipelineLimit)) + // Set WriteFlushInterval for cluster mode (grouped pipeline uses auto buffering) + if isCluster && pipelineWindow > 0 { + poolConfig.Dialer.WriteFlushInterval = pipelineWindow + logger.Debugf("Cluster mode: setting WriteFlushInterval to %v", pipelineWindow) } - logger.Debugf("Implicit pipelining enabled: %v", implicitPipelining) + // IMPORTANT: radix v4 pool behavior changes from v3 + // + // v4 uses a FIXED pool size and BLOCKS when all connections are in use. + // This is the same as v3's WAIT behavior. + // + // v3 CREATE and ERROR behaviors are NOT supported in v4: + // - v3 WAIT → v4 supported (blocks until connection available) + // - v3 CREATE → v4 NOT SUPPORTED (would block instead of creating overflow connections) + // - v3 ERROR → v4 NOT SUPPORTED (would block instead of failing fast) + // + // Migration requirements: + // - Remove REDIS_POOL_ON_EMPTY_BEHAVIOR setting if set to CREATE or ERROR + // - Use WAIT or leave unset (WAIT is default) + // - Consider increasing REDIS_POOL_SIZE if you previously relied on CREATE + // - Use context timeouts to prevent indefinite blocking: + // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + // defer cancel() + // client.Do(ctx, cmd) switch strings.ToUpper(poolOnEmptyBehavior) { case "WAIT": - opts = append(opts, radix.PoolOnEmptyWait()) - logger.Warnf("Redis pool %s: on-empty=WAIT (block until connection available)", maskedUrl) + logger.Warnf("Redis pool %s: WAIT is default in radix v4 (blocks until connection available)", maskedUrl) case "CREATE": - opts = append(opts, radix.PoolOnEmptyCreateAfter(poolOnEmptyWaitDuration)) - logger.Warnf("Redis pool %s: on-empty=CREATE after %v", maskedUrl, poolOnEmptyWaitDuration) + // v3 CREATE created overflow connections when pool was full + // v4 does NOT support this - fail fast to prevent unexpected blocking behavior + panic(RedisError("REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4. Pool will block instead of creating overflow connections. Remove this setting or set to WAIT, and consider increasing REDIS_POOL_SIZE.")) case "ERROR": - opts = append(opts, radix.PoolOnEmptyErrAfter(poolOnEmptyWaitDuration)) - logger.Warnf("Redis pool %s: on-empty=ERROR after %v (fail-fast)", maskedUrl, poolOnEmptyWaitDuration) + // v3 ERROR failed fast when pool was full + // v4 does NOT support this - fail fast to prevent unexpected blocking behavior + panic(RedisError("REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4. Pool will block instead of failing fast. Remove this setting or set to WAIT, and use context timeouts for fail-fast behavior.")) default: - logger.Warnf("Redis pool %s: invalid on-empty behavior '%s', using default CREATE after %v", maskedUrl, poolOnEmptyBehavior, poolOnEmptyWaitDuration) - opts = append(opts, radix.PoolOnEmptyCreateAfter(poolOnEmptyWaitDuration)) + logger.Warnf("Redis pool %s: using v4 default (fixed size=%d, blocks when full)", maskedUrl, poolSize) } - poolFunc := func(network, addr string) (radix.Client, error) { - return radix.NewPool(network, addr, poolSize, opts...) + poolFunc := func(ctx context.Context, network, addr string) (radix.Client, error) { + return poolConfig.New(ctx, network, addr) } - var client radix.Client + var client redisClient var err error + ctx := context.Background() + switch strings.ToLower(redisType) { case "single": - client, err = poolFunc(redisSocketType, url) + logger.Warnf("Creating single with urls %v", url) + client, err = poolFunc(ctx, redisSocketType, url) case "cluster": urls := strings.Split(url, ",") - if !implicitPipelining { - panic(RedisError("Implicit Pipelining must be enabled to work with Redis Cluster Mode. Set values for REDIS_PIPELINE_WINDOW or REDIS_PIPELINE_LIMIT to enable implicit pipelining")) - } logger.Warnf("Creating cluster with urls %v", urls) - client, err = radix.NewCluster(urls, radix.ClusterPoolFunc(poolFunc)) + clusterConfig := radix.ClusterConfig{ + PoolConfig: poolConfig, + } + client, err = clusterConfig.New(ctx, urls) case "sentinel": urls := strings.Split(url, ",") if len(urls) < 2 { panic(RedisError("Expected master name and a list of urls for the sentinels, in the format: ,,...,")) } - sentinelDialFunc := func(network, addr string) (radix.Conn, error) { - var dialOpts []radix.DialOpt - // Always set the dial timeout consistent with the main dial func - dialOpts = append(dialOpts, radix.DialTimeout(timeout)) - if useTls { - logger.Warnf("enabling TLS to redis sentinel on %s", addr) - dialOpts = append(dialOpts, radix.DialUseTLS(tlsConfig)) - } - // Use sentinelAuth for authenticating to Sentinel nodes, not auth - // auth is used for Redis master/replica authentication - if sentinelAuth != "" { - user, pass, found := strings.Cut(sentinelAuth, ":") - if found { - logger.Warnf("enabling authentication to redis sentinel on %s with user %s", addr, user) - dialOpts = append(dialOpts, radix.DialAuthUser(user, pass)) - } else { - logger.Warnf("enabling authentication to redis sentinel on %s without user", addr) - dialOpts = append(dialOpts, radix.DialAuthPass(sentinelAuth)) - } - } - return radix.Dial(network, addr, dialOpts...) + + // Create sentinel dialer (may use different auth from Redis master/replica) + // sentinelAuth is for Sentinel nodes, auth is for Redis master/replica + sentinelDialer := createDialer(timeout, useTls, tlsConfig, sentinelAuth, fmt.Sprintf("sentinel(%s)", maskedUrl)) + + sentinelConfig := radix.SentinelConfig{ + PoolConfig: poolConfig, + SentinelDialer: sentinelDialer, } - client, err = radix.NewSentinel(urls[0], urls[1:], radix.SentinelConnFunc(sentinelDialFunc), radix.SentinelPoolFunc(poolFunc)) + client, err = sentinelConfig.New(ctx, urls[0], urls[1:]) default: panic(RedisError("Unrecognized redis type " + redisType)) } @@ -179,20 +229,25 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT // Check if connection is good var pingResponse string - checkError(client.Do(radix.Cmd(&pingResponse, "PING"))) + checkError(client.Do(ctx, radix.Cmd(&pingResponse, "PING"))) if pingResponse != "PONG" { checkError(fmt.Errorf("connecting redis error: %s", pingResponse)) } return &clientImpl{ - client: client, - stats: stats, - implicitPipelining: implicitPipelining, + client: client, + stats: stats, + isCluster: isCluster, } } func (c *clientImpl) DoCmd(rcv interface{}, cmd, key string, args ...interface{}) error { - return c.client.Do(radix.FlatCmd(rcv, cmd, key, args...)) + ctx := context.Background() + // Combine key and args into a single slice + allArgs := make([]interface{}, 0, 1+len(args)) + allArgs = append(allArgs, key) + allArgs = append(allArgs, args...) + return c.client.Do(ctx, radix.FlatCmd(rcv, cmd, allArgs...)) } func (c *clientImpl) Close() error { @@ -204,22 +259,67 @@ func (c *clientImpl) NumActiveConns() int { } func (c *clientImpl) PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key string, args ...interface{}) Pipeline { - return append(pipeline, radix.FlatCmd(rcv, cmd, key, args...)) + // Combine key and args into a single slice + allArgs := make([]interface{}, 0, 1+len(args)) + allArgs = append(allArgs, key) + allArgs = append(allArgs, args...) + return append(pipeline, PipelineAction{ + Action: radix.FlatCmd(rcv, cmd, allArgs...), + Key: key, + }) } func (c *clientImpl) PipeDo(pipeline Pipeline) error { - if c.implicitPipelining { - for _, action := range pipeline { - if err := c.client.Do(action); err != nil { + ctx := context.Background() + if c.isCluster { + // Cluster mode: group commands by key and execute each group as a pipeline. + // This ensures INCRBY + EXPIRE for the same key are pipelined together (same slot), + // reducing round-trips from 2 to 1 per key. + return c.executeGroupedPipeline(ctx, pipeline) + } + + // Single/Sentinel mode: batch all commands in a single pipeline. + p := radix.NewPipeline() + for _, pipelineAction := range pipeline { + p.Append(pipelineAction.Action) + } + return c.client.Do(ctx, p) +} + +// executeGroupedPipeline groups pipeline actions by key and executes each group +// as a separate pipeline. This allows same-key commands (like INCRBY + EXPIRE) +// to be pipelined together even in cluster mode. +func (c *clientImpl) executeGroupedPipeline(ctx context.Context, pipeline Pipeline) error { + // Group actions by key, preserving first-occurrence order + var groups [][]radix.Action + keyToIndex := make(map[string]int) + + for _, pa := range pipeline { + if idx, exists := keyToIndex[pa.Key]; exists { + groups[idx] = append(groups[idx], pa.Action) + } else { + keyToIndex[pa.Key] = len(groups) + groups = append(groups, []radix.Action{pa.Action}) + } + } + + // Execute each group + for _, actions := range groups { + if len(actions) == 1 { + if err := c.client.Do(ctx, actions[0]); err != nil { + return err + } + } else { + // Multiple commands for same key: pipeline them together + p := radix.NewPipeline() + for _, action := range actions { + p.Append(action) + } + if err := c.client.Do(ctx, p); err != nil { return err } } - return nil } - return c.client.Do(radix.Pipeline(pipeline...)) -} - -func (c *clientImpl) ImplicitPipeliningEnabled() bool { - return c.implicitPipelining + return nil } diff --git a/src/settings/settings.go b/src/settings/settings.go index 9beb786c0..64cc7d928 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -137,12 +137,17 @@ type Settings struct { RedisTlsCACert string `envconfig:"REDIS_TLS_CACERT" default:""` RedisTlsSkipHostnameVerification bool `envconfig:"REDIS_TLS_SKIP_HOSTNAME_VERIFICATION" default:"false"` - // RedisPipelineWindow sets the duration after which internal pipelines will be flushed. - // If window is zero then implicit pipelining will be disabled. Radix use 150us for the - // default value, see https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L278. + // RedisPipelineWindow sets the WriteFlushInterval for radix v4 connections. + // This controls how often buffered writes are flushed to the network connection. + // When set to a non-zero value, radix v4 will buffer multiple concurrent write operations + // and flush them together, reducing system calls and improving throughput. + // If zero, each write is flushed immediately (no buffering). + // Required for Redis Cluster mode. Recommended value: 150us-500us. + // See: https://pkg.go.dev/github.com/mediocregopher/radix/v4#Dialer RedisPipelineWindow time.Duration `envconfig:"REDIS_PIPELINE_WINDOW" default:"0"` - // RedisPipelineLimit sets maximum number of commands that can be pipelined before flushing. - // If limit is zero then no limit will be used and pipelines will only be limited by the specified time window. + // RedisPipelineLimit is DEPRECATED and unused in radix v4. + // This setting has no effect. Radix v4 does not support explicit pipeline size limits. + // Write buffering is controlled solely by RedisPipelineWindow (WriteFlushInterval). RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` @@ -159,10 +164,10 @@ type Settings struct { // This is separate from RedisPerSecondAuth which is used for authenticating to the Redis master/replica nodes. // If empty, no authentication will be attempted when connecting to per-second Sentinel nodes. RedisPerSecondSentinelAuth string `envconfig:"REDIS_PERSECOND_SENTINEL_AUTH" default:""` - // RedisPerSecondPipelineWindow sets the duration after which internal pipelines will be flushed for per second redis. + // RedisPerSecondPipelineWindow sets the WriteFlushInterval for per-second redis connections. // See comments of RedisPipelineWindow for details. RedisPerSecondPipelineWindow time.Duration `envconfig:"REDIS_PERSECOND_PIPELINE_WINDOW" default:"0"` - // RedisPerSecondPipelineLimit sets maximum number of commands that can be pipelined before flushing for per second redis. + // RedisPerSecondPipelineLimit is DEPRECATED and unused in radix v4. // See comments of RedisPipelineLimit for details. RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"` // Enable healthcheck to check Redis Connection. If there is no active connection, healthcheck failed. @@ -173,22 +178,17 @@ type Settings struct { RedisPerSecondTimeout time.Duration `envconfig:"REDIS_PERSECOND_TIMEOUT" default:"10s"` // RedisPoolOnEmptyBehavior controls what happens when Redis connection pool is empty. - // This setting helps prevent connection storms during Redis failures. + // NOTE: In radix v4, the pool ALWAYS blocks when empty (WAIT behavior). // Possible values: - // - "CREATE": Create a new connection after RedisPoolOnEmptyWaitDuration (default) - // - "ERROR": Return error after RedisPoolOnEmptyWaitDuration - // - "WAIT": Block until a connection is available - RedisPoolOnEmptyBehavior string `envconfig:"REDIS_POOL_ON_EMPTY_BEHAVIOR" default:"CREATE"` - // RedisPoolOnEmptyWaitDuration is the wait duration before taking action when pool is empty. - // Only applicable when RedisPoolOnEmptyBehavior is "CREATE" or "ERROR". - RedisPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"` + // - "WAIT": Block until a connection is available (default, radix v4 behavior) + // - "CREATE": NOT SUPPORTED in radix v4 - will cause panic at startup + // - "ERROR": NOT SUPPORTED in radix v4 - will cause panic at startup + // For fail-fast behavior, use context timeouts when calling Redis operations. + RedisPoolOnEmptyBehavior string `envconfig:"REDIS_POOL_ON_EMPTY_BEHAVIOR" default:"WAIT"` // RedisPerSecondPoolOnEmptyBehavior controls pool-empty behavior for per-second Redis. // See RedisPoolOnEmptyBehavior for possible values and details. - RedisPerSecondPoolOnEmptyBehavior string `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR" default:"CREATE"` - // RedisPerSecondPoolOnEmptyWaitDuration is the wait duration for per-second Redis pool. - // See RedisPoolOnEmptyWaitDuration for details. - RedisPerSecondPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"` + RedisPerSecondPoolOnEmptyBehavior string `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR" default:"WAIT"` // Memcache settings MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""` diff --git a/src/settings/settings_test.go b/src/settings/settings_test.go index e706196ee..b2c7bc7eb 100644 --- a/src/settings/settings_test.go +++ b/src/settings/settings_test.go @@ -3,7 +3,6 @@ package settings import ( "os" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -17,48 +16,28 @@ func TestSettingsTlsConfigUnmodified(t *testing.T) { // Tests for RedisPoolOnEmptyBehavior func TestRedisPoolOnEmptyBehavior_Default(t *testing.T) { os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() - assert.Equal(t, "CREATE", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, 1*time.Second, settings.RedisPoolOnEmptyWaitDuration) + assert.Equal(t, "WAIT", settings.RedisPoolOnEmptyBehavior) } func TestRedisPoolOnEmptyBehavior_Error(t *testing.T) { os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR") - os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "0") defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, time.Duration(0), settings.RedisPoolOnEmptyWaitDuration) -} - -func TestRedisPoolOnEmptyBehavior_ErrorWithDuration(t *testing.T) { - os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR") - os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "100ms") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") - - settings := NewSettings() - - assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, 100*time.Millisecond, settings.RedisPoolOnEmptyWaitDuration) } func TestRedisPoolOnEmptyBehavior_Create(t *testing.T) { os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "CREATE") - os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "500ms") defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() assert.Equal(t, "CREATE", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, 500*time.Millisecond, settings.RedisPoolOnEmptyWaitDuration) } func TestRedisPoolOnEmptyBehavior_Wait(t *testing.T) { @@ -84,44 +63,33 @@ func TestRedisPoolOnEmptyBehavior_CaseInsensitive(t *testing.T) { // Tests for RedisPerSecondPoolOnEmptyBehavior func TestRedisPerSecondPoolOnEmptyBehavior_Default(t *testing.T) { os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR") - os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() - assert.Equal(t, "CREATE", settings.RedisPerSecondPoolOnEmptyBehavior) - assert.Equal(t, 1*time.Second, settings.RedisPerSecondPoolOnEmptyWaitDuration) + assert.Equal(t, "WAIT", settings.RedisPerSecondPoolOnEmptyBehavior) } func TestRedisPerSecondPoolOnEmptyBehavior_Error(t *testing.T) { os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR", "ERROR") - os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION", "50ms") defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() assert.Equal(t, "ERROR", settings.RedisPerSecondPoolOnEmptyBehavior) - assert.Equal(t, 50*time.Millisecond, settings.RedisPerSecondPoolOnEmptyWaitDuration) } // Test both pools can be configured independently func TestRedisPoolOnEmptyBehavior_IndependentConfiguration(t *testing.T) { os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR") - os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "0") os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR", "CREATE") - os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION", "100ms") defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION") defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR") - defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION") settings := NewSettings() // Main pool configured for fail-fast assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior) - assert.Equal(t, time.Duration(0), settings.RedisPoolOnEmptyWaitDuration) // Per-second pool configured differently assert.Equal(t, "CREATE", settings.RedisPerSecondPoolOnEmptyBehavior) - assert.Equal(t, 100*time.Millisecond, settings.RedisPerSecondPoolOnEmptyWaitDuration) } diff --git a/test/common/common.go b/test/common/common.go index 0eaaa0598..6a414ca09 100644 --- a/test/common/common.go +++ b/test/common/common.go @@ -105,7 +105,7 @@ func WaitForTcpPort(ctx context.Context, port int, timeout time.Duration) error // Wait up to 1s for the redis instance to start accepting connections. for { var d net.Dialer - conn, err := d.DialContext(ctx, "tcp", "localhost:"+strconv.Itoa(port)) + conn, err := d.DialContext(timeoutCtx, "tcp", "localhost:"+strconv.Itoa(port)) if err == nil { conn.Close() // TCP connections are working. All is well. diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index abc3d726d..df5e6fe6d 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -471,8 +471,9 @@ func configRedisCluster(s *settings.Settings) { s.RedisAuth = "password123" s.RedisPerSecondAuth = "password123" - s.RedisPerSecondPipelineLimit = 8 - s.RedisPipelineLimit = 8 + // RedisPipelineLimit is deprecated in radix v4, use RedisPipelineWindow instead + s.RedisPerSecondPipelineWindow = 150 * time.Microsecond + s.RedisPipelineWindow = 150 * time.Microsecond } func testBasicConfigWithoutWatchRootWithRedisCluster(perSecond bool, local_cache_size int) func(*testing.T) { @@ -796,7 +797,8 @@ func startTestRunner(t *testing.T, s settings.Settings) *runner.Runner { }() // HACK: Wait for the server to come up. Make a hook that we can wait on. - common.WaitForTcpPort(context.Background(), s.GrpcPort, 1*time.Second) + // Increased timeout from 1s to 10s to allow for Redis cluster connection initialization + common.WaitForTcpPort(context.Background(), s.GrpcPort, 10*time.Second) return &runner } diff --git a/test/mocks/redis/redis.go b/test/mocks/redis/redis.go index 2d6d059f8..192b41c26 100644 --- a/test/mocks/redis/redis.go +++ b/test/mocks/redis/redis.go @@ -68,20 +68,6 @@ func (mr *MockClientMockRecorder) DoCmd(arg0, arg1, arg2 interface{}, arg3 ...in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoCmd", reflect.TypeOf((*MockClient)(nil).DoCmd), varargs...) } -// ImplicitPipeliningEnabled mocks base method -func (m *MockClient) ImplicitPipeliningEnabled() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ImplicitPipeliningEnabled") - ret0, _ := ret[0].(bool) - return ret0 -} - -// ImplicitPipeliningEnabled indicates an expected call of ImplicitPipeliningEnabled -func (mr *MockClientMockRecorder) ImplicitPipeliningEnabled() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImplicitPipeliningEnabled", reflect.TypeOf((*MockClient)(nil).ImplicitPipeliningEnabled)) -} - // NumActiveConns mocks base method func (m *MockClient) NumActiveConns() int { m.ctrl.T.Helper() diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go index 1f9e3336b..3b1d5fa49 100644 --- a/test/redis/bench_test.go +++ b/test/redis/bench_test.go @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) { return func(b *testing.B) { statsStore := gostats.NewStore(gostats.NewNullSink(), false) sm := stats.NewMockStatManager(statsStore) - client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "") + client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "") defer client.Close() cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true) diff --git a/test/redis/driver_impl_test.go b/test/redis/driver_impl_test.go index 311d5f03a..8d1b30132 100644 --- a/test/redis/driver_impl_test.go +++ b/test/redis/driver_impl_test.go @@ -39,7 +39,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(auth, addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "") + return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "") } t.Run("connection refused", func(t *testing.T) { @@ -66,7 +66,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit redisSrv.RequireAuth(redisAuth) - assert.PanicsWithError(t, "NOAUTH Authentication required.", func() { + assert.PanicsWithError(t, "response returned from Conn: NOAUTH Authentication required.", func() { mkRedisClient("", redisSrv.Addr()) }) }) @@ -103,36 +103,23 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit redisSrv.RequireUserAuth(user, pass) redisAuth := fmt.Sprintf("%s:invalid-password", user) - assert.PanicsWithError(t, "WRONGPASS invalid username-password pair", func() { + assert.PanicsWithError(t, "response returned from Conn: WRONGPASS invalid username-password pair", func() { mkRedisClient(redisAuth, redisSrv.Addr()) }) }) - - t.Run("ImplicitPipeliningEnabled() return expected value", func(t *testing.T) { - redisSrv := mustNewRedisServer() - defer redisSrv.Close() - - client := mkRedisClient("", redisSrv.Addr()) - - if pipelineWindow == 0 && pipelineLimit == 0 { - assert.False(t, client.ImplicitPipeliningEnabled()) - } else { - assert.True(t, client.ImplicitPipeliningEnabled()) - } - }) } } func TestNewClientImpl(t *testing.T) { - t.Run("ImplicitPipeliningEnabled", testNewClientImpl(t, 2*time.Millisecond, 2)) - t.Run("ImplicitPipeliningDisabled", testNewClientImpl(t, 0, 0)) + t.Run("WithPipelineWindow", testNewClientImpl(t, 2*time.Millisecond, 2)) + t.Run("WithoutPipelineWindow", testNewClientImpl(t, 0, 0)) } func TestDoCmd(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, "", 0, "") + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, "", "") } t.Run("SETGET ok", func(t *testing.T) { @@ -168,7 +155,7 @@ func TestDoCmd(t *testing.T) { assert.Nil(t, client.DoCmd(nil, "SET", "foo", "bar")) redisSrv.Close() - assert.EqualError(t, client.DoCmd(nil, "GET", "foo"), "EOF") + assert.EqualError(t, client.DoCmd(nil, "GET", "foo"), "response returned from Conn: EOF") }) } @@ -177,7 +164,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0, "") + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", "") } t.Run("SETGET ok", func(t *testing.T) { @@ -220,7 +207,13 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f expectErrContainEOF := func(t *testing.T, err error) { assert.NotNil(t, err) - assert.Contains(t, err.Error(), "EOF") + // radix v4 wraps errors with "response returned from Conn:" + // and may return different connection errors (EOF, connection reset, etc) + errMsg := err.Error() + hasConnectionError := strings.Contains(errMsg, "EOF") || + strings.Contains(errMsg, "connection reset") || + strings.Contains(errMsg, "broken pipe") + assert.True(t, hasConnectionError, "expected connection error, got: %s", errMsg) } expectErrContainEOF(t, client.PipeDo(client.PipeAppend(redis.Pipeline{}, nil, "GET", "foo"))) @@ -229,8 +222,8 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f } func TestPipeDo(t *testing.T) { - t.Run("ImplicitPipeliningEnabled", testPipeDo(t, 10*time.Millisecond, 2)) - t.Run("ImplicitPipeliningDisabled", testPipeDo(t, 0, 0)) + t.Run("WithPipelineWindow", testPipeDo(t, 10*time.Millisecond, 2)) + t.Run("WithoutPipelineWindow", testPipeDo(t, 0, 0)) } // Tests for pool on-empty behavior @@ -238,8 +231,8 @@ func TestPoolOnEmptyBehavior(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) // Helper to create client with specific on-empty behavior - mkRedisClientWithBehavior := func(addr, behavior string, waitDuration time.Duration) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, behavior, waitDuration, "") + mkRedisClientWithBehavior := func(addr, behavior string) redis.Client { + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second, behavior, "") } t.Run("default behavior (empty string)", func(t *testing.T) { @@ -248,7 +241,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "") }) assert.NotNil(t, client) @@ -259,99 +252,75 @@ func TestPoolOnEmptyBehavior(t *testing.T) { assert.Equal(t, "bar", res) }) - t.Run("ERROR behavior", func(t *testing.T) { + t.Run("ERROR behavior should panic", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR", 0) + // radix v4 does not support ERROR behavior - should panic at startup + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR") }) - assert.NotNil(t, client) - - // Verify client works - var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test", "value")) - assert.Nil(t, client.DoCmd(&res, "GET", "test")) - assert.Equal(t, "value", res) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4") }) - t.Run("ERROR behavior with wait duration", func(t *testing.T) { + t.Run("CREATE behavior should panic", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "ERROR", 100*time.Millisecond) + // radix v4 does not support CREATE behavior - should panic at startup + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE") }) - assert.NotNil(t, client) - - // Verify client works - var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test2", "value2")) - assert.Nil(t, client.DoCmd(&res, "GET", "test2")) - assert.Equal(t, "value2", res) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4") }) - t.Run("CREATE behavior", func(t *testing.T) { + t.Run("WAIT behavior", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "WAIT") }) assert.NotNil(t, client) // Verify client works var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test3", "value3")) - assert.Nil(t, client.DoCmd(&res, "GET", "test3")) - assert.Equal(t, "value3", res) + assert.Nil(t, client.DoCmd(nil, "SET", "test5", "value5")) + assert.Nil(t, client.DoCmd(&res, "GET", "test5")) + assert.Equal(t, "value5", res) }) - t.Run("CREATE behavior with wait duration", func(t *testing.T) { + t.Run("case insensitive behavior - lowercase 'error' panics", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "CREATE", 500*time.Millisecond) + // Test that lowercase 'error' is treated same as 'ERROR' (case insensitive) + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "error") }) - assert.NotNil(t, client) - - // Verify client works - var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test4", "value4")) - assert.Nil(t, client.DoCmd(&res, "GET", "test4")) - assert.Equal(t, "value4", res) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=ERROR is not supported in radix v4") }) - t.Run("WAIT behavior", func(t *testing.T) { + t.Run("case insensitive behavior - lowercase 'create' panics", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "WAIT", 0) + // Test that lowercase 'create' is treated same as 'CREATE' (case insensitive) + panicErr := expectPanicError(t, func() { + mkRedisClientWithBehavior(redisSrv.Addr(), "create") }) - assert.NotNil(t, client) - - // Verify client works - var res string - assert.Nil(t, client.DoCmd(nil, "SET", "test5", "value5")) - assert.Nil(t, client.DoCmd(&res, "GET", "test5")) - assert.Equal(t, "value5", res) + assert.Contains(t, panicErr.Error(), "REDIS_POOL_ON_EMPTY_BEHAVIOR=CREATE is not supported in radix v4") }) - t.Run("case insensitive behavior", func(t *testing.T) { + t.Run("case insensitive behavior - lowercase 'wait' works", func(t *testing.T) { redisSrv := mustNewRedisServer() defer redisSrv.Close() - // Test lowercase + // Test that lowercase 'wait' is treated same as 'WAIT' (case insensitive) var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "error", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "wait") }) assert.NotNil(t, client) @@ -369,7 +338,7 @@ func TestPoolOnEmptyBehavior(t *testing.T) { // Unknown behavior should not panic, just log warning and use default var client redis.Client assert.NotPanics(t, func() { - client = mkRedisClientWithBehavior(redisSrv.Addr(), "UNKNOWN_BEHAVIOR", 0) + client = mkRedisClientWithBehavior(redisSrv.Addr(), "UNKNOWN_BEHAVIOR") }) assert.NotNil(t, client) @@ -387,7 +356,7 @@ func TestNewClientImplSentinel(t *testing.T) { mkSentinelClient := func(auth, sentinelAuth, url string, useTls bool, timeout time.Duration) redis.Client { // Pass nil for tlsConfig - we can't test TLS without a real TLS server, // but we can verify the code path is executed (logs will show TLS is enabled) - return redis.NewClientImpl(statsStore, useTls, auth, "tcp", "sentinel", url, 1, 0, 0, nil, false, nil, timeout, "", 0, sentinelAuth) + return redis.NewClientImpl(statsStore, useTls, auth, "tcp", "sentinel", url, 1, 0, 0, nil, false, nil, timeout, "", sentinelAuth) } t.Run("invalid url format - missing sentinel addresses", func(t *testing.T) { diff --git a/test/redis/fixed_cache_impl_test.go b/test/redis/fixed_cache_impl_test.go index 7838b67ab..73229ab6e 100644 --- a/test/redis/fixed_cache_impl_test.go +++ b/test/redis/fixed_cache_impl_test.go @@ -8,7 +8,7 @@ import ( "github.com/envoyproxy/ratelimit/test/mocks/stats" "github.com/coocood/freecache" - "github.com/mediocregopher/radix/v3" + "github.com/mediocregopher/radix/v4" pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" gostats "github.com/lyft/gostats" @@ -35,7 +35,10 @@ func TestRedis(t *testing.T) { } func pipeAppend(pipeline redis.Pipeline, rcv interface{}, cmd, key string, args ...interface{}) redis.Pipeline { - return append(pipeline, radix.FlatCmd(rcv, cmd, key, args...)) + return append(pipeline, redis.PipelineAction{ + Action: radix.FlatCmd(rcv, cmd, append([]interface{}{key}, args...)...), + Key: key, + }) } func testRedis(usePerSecondRedis bool) func(*testing.T) {