From f3e91263a7c978c0ba1e6c9b57a24bc600b774d3 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Mon, 24 Nov 2025 17:17:22 +0200 Subject: [PATCH 1/4] lazy cluster topology reload --- commands_test.go | 20 +++- osscluster.go | 57 ++++++++-- osscluster_lazy_reload_test.go | 201 +++++++++++++++++++++++++++++++++ 3 files changed, 265 insertions(+), 13 deletions(-) create mode 100644 osscluster_lazy_reload_test.go diff --git a/commands_test.go b/commands_test.go index edbae4e7a3..24640c23c8 100644 --- a/commands_test.go +++ b/commands_test.go @@ -8905,27 +8905,37 @@ var _ = Describe("Commands", func() { const key = "latency-monitor-threshold" old := client.ConfigGet(ctx, key).Val() - client.ConfigSet(ctx, key, "1") + // Use a higher threshold (100ms) to avoid capturing normal operations + // that could cause flakiness due to timing variations + client.ConfigSet(ctx, key, "100") defer client.ConfigSet(ctx, key, old[key]) result, err := client.Latency(ctx).Result() Expect(err).NotTo(HaveOccurred()) Expect(len(result)).Should(Equal(0)) - err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err() + // Use a longer sleep (150ms) to ensure it exceeds the 100ms threshold + err = client.Do(ctx, "DEBUG", "SLEEP", 0.15).Err() Expect(err).NotTo(HaveOccurred()) result, err = client.Latency(ctx).Result() Expect(err).NotTo(HaveOccurred()) - Expect(len(result)).Should(Equal(1)) + Expect(len(result)).Should(BeNumerically(">=", 1)) // reset latency by event name - err = client.LatencyReset(ctx, result[0].Name).Err() + eventName := result[0].Name + err = client.LatencyReset(ctx, eventName).Err() Expect(err).NotTo(HaveOccurred()) + // Verify the specific event was reset (not that all events are gone) + // This avoids flakiness from other operations triggering latency events result, err = client.Latency(ctx).Result() Expect(err).NotTo(HaveOccurred()) - Expect(len(result)).Should(Equal(0)) + for _, event := range result { + if event.Name == eventName { + Fail("Event " + eventName + " should have been reset") + } + } }) }) }) diff --git a/osscluster.go b/osscluster.go index 7925d2c603..768b665a4c 100644 --- a/osscluster.go +++ b/osscluster.go @@ -146,7 +146,8 @@ type ClusterOptions struct { // cluster upgrade notifications gracefully and manage connection/pool state // transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications. // If nil, maintnotifications upgrades are in "auto" mode and will be enabled if the server supports it. - // The ClusterClient does not directly work with maintnotifications, it is up to the clients in the Nodes map to work with maintnotifications. + // The ClusterClient supports SMIGRATING and SMIGRATED notifications for cluster state management. + // Individual node clients handle other maintenance notifications (MOVING, MIGRATING, etc.). MaintNotificationsConfig *maintnotifications.Config } @@ -945,8 +946,9 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { type clusterStateHolder struct { load func(ctx context.Context) (*clusterState, error) - state atomic.Value - reloading uint32 // atomic + state atomic.Value + reloading uint32 // atomic + reloadPending uint32 // atomic - set to 1 when reload is requested during active reload } func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder { @@ -965,17 +967,36 @@ func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) } func (c *clusterStateHolder) LazyReload() { + // If already reloading, mark that another reload is pending if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { + atomic.StoreUint32(&c.reloadPending, 1) return } + go func() { - defer atomic.StoreUint32(&c.reloading, 0) + for { + _, err := c.Reload(context.Background()) + if err != nil { + atomic.StoreUint32(&c.reloading, 0) + return + } - _, err := c.Reload(context.Background()) - if err != nil { - return + // Clear pending flag after reload completes, before cooldown + // This captures notifications that arrived during the reload + atomic.StoreUint32(&c.reloadPending, 0) + + // Wait cooldown period + time.Sleep(200 * time.Millisecond) + + // Check if another reload was requested during cooldown + if atomic.LoadUint32(&c.reloadPending) == 0 { + // No pending reload, we're done + atomic.StoreUint32(&c.reloading, 0) + return + } + + // Pending reload requested, loop to reload again } - time.Sleep(200 * time.Millisecond) }() } @@ -1038,6 +1059,26 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { txPipeline: c.processTxPipeline, }) + // Set up SMIGRATED notification handling for cluster state reload + // When a node client receives a SMIGRATED notification, it should trigger + // cluster state reload on the parent ClusterClient + if opt.MaintNotificationsConfig != nil { + c.nodes.OnNewNode(func(nodeClient *Client) { + manager := nodeClient.GetMaintNotificationsManager() + if manager != nil { + manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) { + // Log the migration details for now + if internal.LogLevel.InfoOrAbove() { + internal.Logger.Printf(ctx, "cluster: slots %v migrated to %s, reloading cluster state", slotRanges, hostPort) + } + // Currently we reload the entire cluster state + // In the future, this could be optimized to reload only the specific slots + c.state.LazyReload() + }) + } + }) + } + return c } diff --git a/osscluster_lazy_reload_test.go b/osscluster_lazy_reload_test.go new file mode 100644 index 0000000000..f66bb424ae --- /dev/null +++ b/osscluster_lazy_reload_test.go @@ -0,0 +1,201 @@ +package redis + +import ( + "context" + "sync/atomic" + "testing" + "time" +) + +// TestLazyReloadQueueBehavior tests that LazyReload properly queues reload requests +func TestLazyReloadQueueBehavior(t *testing.T) { + t.Run("SingleReload", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(50 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger one reload + holder.LazyReload() + + // Wait for reload to complete + time.Sleep(300 * time.Millisecond) + + if count := reloadCount.Load(); count != 1 { + t.Errorf("Expected 1 reload, got %d", count) + } + }) + + t.Run("ConcurrentReloadsDeduplication", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(50 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger multiple reloads concurrently + for i := 0; i < 10; i++ { + go holder.LazyReload() + } + + // Wait for all to complete + time.Sleep(100 * time.Millisecond) + + // Should only reload once (all concurrent calls deduplicated) + if count := reloadCount.Load(); count != 1 { + t.Errorf("Expected 1 reload (deduplication), got %d", count) + } + }) + + t.Run("PendingReloadDuringCooldown", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(10 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger first reload + holder.LazyReload() + + // Wait for reload to complete but still in cooldown + time.Sleep(50 * time.Millisecond) + + // Trigger second reload during cooldown period + holder.LazyReload() + + // Wait for second reload to complete + time.Sleep(300 * time.Millisecond) + + // Should have reloaded twice (second request queued and executed) + if count := reloadCount.Load(); count != 2 { + t.Errorf("Expected 2 reloads (queued during cooldown), got %d", count) + } + }) + + t.Run("MultiplePendingReloadsCollapsed", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(10 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger first reload + holder.LazyReload() + + // Wait for reload to start + time.Sleep(5 * time.Millisecond) + + // Trigger multiple reloads during active reload + cooldown + for i := 0; i < 10; i++ { + holder.LazyReload() + time.Sleep(5 * time.Millisecond) + } + + // Wait for all to complete + time.Sleep(400 * time.Millisecond) + + // Should have reloaded exactly twice: + // 1. Initial reload + // 2. One more reload for all the pending requests (collapsed into one) + if count := reloadCount.Load(); count != 2 { + t.Errorf("Expected 2 reloads (initial + collapsed pending), got %d", count) + } + }) + + t.Run("ReloadAfterCooldownPeriod", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(10 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger first reload + holder.LazyReload() + + // Wait for reload + cooldown to complete + time.Sleep(300 * time.Millisecond) + + // Trigger second reload after cooldown + holder.LazyReload() + + // Wait for second reload to complete + time.Sleep(300 * time.Millisecond) + + // Should have reloaded twice (separate reload cycles) + if count := reloadCount.Load(); count != 2 { + t.Errorf("Expected 2 reloads (separate cycles), got %d", count) + } + }) + + t.Run("ErrorDuringReload", func(t *testing.T) { + var reloadCount atomic.Int32 + var shouldFail atomic.Bool + shouldFail.Store(true) + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + if shouldFail.Load() { + return nil, context.DeadlineExceeded + } + return &clusterState{}, nil + }) + + // Trigger reload that will fail + holder.LazyReload() + + // Wait for failed reload + time.Sleep(50 * time.Millisecond) + + // Trigger another reload (should succeed now) + shouldFail.Store(false) + holder.LazyReload() + + // Wait for successful reload + time.Sleep(300 * time.Millisecond) + + // Should have attempted reload twice (first failed, second succeeded) + if count := reloadCount.Load(); count != 2 { + t.Errorf("Expected 2 reload attempts, got %d", count) + } + }) + + t.Run("CascadingSMigratedScenario", func(t *testing.T) { + // Simulate the real-world scenario: multiple SMIGRATED notifications + // arriving in quick succession from different node clients + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(20 * time.Millisecond) // Simulate realistic reload time + return &clusterState{}, nil + }) + + // Simulate 5 SMIGRATED notifications arriving within 100ms + for i := 0; i < 5; i++ { + go holder.LazyReload() + time.Sleep(20 * time.Millisecond) + } + + // Wait for all reloads to complete + time.Sleep(500 * time.Millisecond) + + // Should reload at most 2 times: + // 1. First notification triggers reload + // 2. Notifications 2-5 collapse into one pending reload + count := reloadCount.Load() + if count < 1 || count > 2 { + t.Errorf("Expected 1-2 reloads for cascading scenario, got %d", count) + } + }) +} From e9da546f57179640d09746a714d2ff1287cd0167 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Tue, 2 Dec 2025 15:53:54 +0200 Subject: [PATCH 2/4] fix discrepancies between options structs --- osscluster.go | 74 ++++++++++++++++++++------- sentinel.go | 124 +++++++++++++++++++++++++++++---------------- universal.go | 137 +++++++++++++++++++++++++++++++------------------- 3 files changed, 223 insertions(+), 112 deletions(-) diff --git a/osscluster.go b/osscluster.go index 768b665a4c..898c18c1a3 100644 --- a/osscluster.go +++ b/osscluster.go @@ -85,13 +85,29 @@ type ClusterOptions struct { MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - DialTimeout time.Duration - ReadTimeout time.Duration - WriteTimeout time.Duration + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + + // DialerRetries is the maximum number of retry attempts when dialing fails. + // + // default: 5 + DialerRetries int + + // DialerRetryTimeout is the backoff duration between retry attempts. + // + // default: 100 milliseconds + DialerRetryTimeout time.Duration + ContextTimeoutEnabled bool - PoolFIFO bool - PoolSize int // applies per cluster node and not for the whole cluster + PoolFIFO bool + PoolSize int // applies per cluster node and not for the whole cluster + + // MaxConcurrentDials is the maximum number of concurrent connection creation goroutines. + // If <= 0, defaults to PoolSize. If > PoolSize, it will be capped at PoolSize. + MaxConcurrentDials int + PoolTimeout time.Duration MinIdleConns int MaxIdleConns int @@ -163,9 +179,24 @@ func (opt *ClusterOptions) init() { opt.ReadOnly = true } + if opt.DialTimeout == 0 { + opt.DialTimeout = 5 * time.Second + } + if opt.DialerRetries == 0 { + opt.DialerRetries = 5 + } + if opt.DialerRetryTimeout == 0 { + opt.DialerRetryTimeout = 100 * time.Millisecond + } + if opt.PoolSize == 0 { opt.PoolSize = 5 * runtime.GOMAXPROCS(0) } + if opt.MaxConcurrentDials <= 0 { + opt.MaxConcurrentDials = opt.PoolSize + } else if opt.MaxConcurrentDials > opt.PoolSize { + opt.MaxConcurrentDials = opt.PoolSize + } if opt.ReadBufferSize == 0 { opt.ReadBufferSize = proto.DefaultBufferSize } @@ -303,10 +334,13 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er o.MinRetryBackoff = q.duration("min_retry_backoff") o.MaxRetryBackoff = q.duration("max_retry_backoff") o.DialTimeout = q.duration("dial_timeout") + o.DialerRetries = q.int("dialer_retries") + o.DialerRetryTimeout = q.duration("dialer_retry_timeout") o.ReadTimeout = q.duration("read_timeout") o.WriteTimeout = q.duration("write_timeout") o.PoolFIFO = q.bool("pool_fifo") o.PoolSize = q.int("pool_size") + o.MaxConcurrentDials = q.int("max_concurrent_dials") o.MinIdleConns = q.int("min_idle_conns") o.MaxIdleConns = q.int("max_idle_conns") o.MaxActiveConns = q.int("max_active_conns") @@ -362,21 +396,25 @@ func (opt *ClusterOptions) clientOptions() *Options { MinRetryBackoff: opt.MinRetryBackoff, MaxRetryBackoff: opt.MaxRetryBackoff, - DialTimeout: opt.DialTimeout, - ReadTimeout: opt.ReadTimeout, - WriteTimeout: opt.WriteTimeout, + DialTimeout: opt.DialTimeout, + DialerRetries: opt.DialerRetries, + DialerRetryTimeout: opt.DialerRetryTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + ContextTimeoutEnabled: opt.ContextTimeoutEnabled, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - MinIdleConns: opt.MinIdleConns, - MaxIdleConns: opt.MaxIdleConns, - MaxActiveConns: opt.MaxActiveConns, - ConnMaxIdleTime: opt.ConnMaxIdleTime, - ConnMaxLifetime: opt.ConnMaxLifetime, - ReadBufferSize: opt.ReadBufferSize, - WriteBufferSize: opt.WriteBufferSize, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + MaxConcurrentDials: opt.MaxConcurrentDials, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + MaxActiveConns: opt.MaxActiveConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, + ReadBufferSize: opt.ReadBufferSize, + WriteBufferSize: opt.WriteBufferSize, DisableIdentity: opt.DisableIdentity, DisableIndentity: opt.DisableIdentity, IdentitySuffix: opt.IdentitySuffix, diff --git a/sentinel.go b/sentinel.go index 663f7b1ad9..8565a31e60 100644 --- a/sentinel.go +++ b/sentinel.go @@ -89,7 +89,18 @@ type FailoverOptions struct { MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - DialTimeout time.Duration + DialTimeout time.Duration + + // DialerRetries is the maximum number of retry attempts when dialing fails. + // + // default: 5 + DialerRetries int + + // DialerRetryTimeout is the backoff duration between retry attempts. + // + // default: 100 milliseconds + DialerRetryTimeout time.Duration + ReadTimeout time.Duration WriteTimeout time.Duration ContextTimeoutEnabled bool @@ -110,7 +121,12 @@ type FailoverOptions struct { PoolFIFO bool - PoolSize int + PoolSize int + + // MaxConcurrentDials is the maximum number of concurrent connection creation goroutines. + // If <= 0, defaults to PoolSize. If > PoolSize, it will be capped at PoolSize. + MaxConcurrentDials int + PoolTimeout time.Duration MinIdleConns int MaxIdleConns int @@ -141,6 +157,10 @@ type FailoverOptions struct { UnstableResp3 bool + // PushNotificationProcessor is the processor for handling push notifications. + // If nil, a default processor will be created for RESP3 connections. + PushNotificationProcessor push.NotificationProcessor + // MaintNotificationsConfig is not supported for FailoverClients at the moment // MaintNotificationsConfig provides custom configuration for maintnotifications upgrades. // When MaintNotificationsConfig.Mode is not "disabled", the client will handle @@ -174,27 +194,32 @@ func (opt *FailoverOptions) clientOptions() *Options { ReadBufferSize: opt.ReadBufferSize, WriteBufferSize: opt.WriteBufferSize, - DialTimeout: opt.DialTimeout, - ReadTimeout: opt.ReadTimeout, - WriteTimeout: opt.WriteTimeout, + DialTimeout: opt.DialTimeout, + DialerRetries: opt.DialerRetries, + DialerRetryTimeout: opt.DialerRetryTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + ContextTimeoutEnabled: opt.ContextTimeoutEnabled, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - MinIdleConns: opt.MinIdleConns, - MaxIdleConns: opt.MaxIdleConns, - MaxActiveConns: opt.MaxActiveConns, - ConnMaxIdleTime: opt.ConnMaxIdleTime, - ConnMaxLifetime: opt.ConnMaxLifetime, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + MaxConcurrentDials: opt.MaxConcurrentDials, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + MaxActiveConns: opt.MaxActiveConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, TLSConfig: opt.TLSConfig, DisableIdentity: opt.DisableIdentity, DisableIndentity: opt.DisableIndentity, - IdentitySuffix: opt.IdentitySuffix, - UnstableResp3: opt.UnstableResp3, + IdentitySuffix: opt.IdentitySuffix, + UnstableResp3: opt.UnstableResp3, + PushNotificationProcessor: opt.PushNotificationProcessor, MaintNotificationsConfig: &maintnotifications.Config{ Mode: maintnotifications.ModeDisabled, @@ -222,27 +247,32 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options { ReadBufferSize: 4096, WriteBufferSize: 4096, - DialTimeout: opt.DialTimeout, - ReadTimeout: opt.ReadTimeout, - WriteTimeout: opt.WriteTimeout, + DialTimeout: opt.DialTimeout, + DialerRetries: opt.DialerRetries, + DialerRetryTimeout: opt.DialerRetryTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + ContextTimeoutEnabled: opt.ContextTimeoutEnabled, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - MinIdleConns: opt.MinIdleConns, - MaxIdleConns: opt.MaxIdleConns, - MaxActiveConns: opt.MaxActiveConns, - ConnMaxIdleTime: opt.ConnMaxIdleTime, - ConnMaxLifetime: opt.ConnMaxLifetime, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + MaxConcurrentDials: opt.MaxConcurrentDials, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + MaxActiveConns: opt.MaxActiveConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, TLSConfig: opt.TLSConfig, DisableIdentity: opt.DisableIdentity, DisableIndentity: opt.DisableIndentity, - IdentitySuffix: opt.IdentitySuffix, - UnstableResp3: opt.UnstableResp3, + IdentitySuffix: opt.IdentitySuffix, + UnstableResp3: opt.UnstableResp3, + PushNotificationProcessor: opt.PushNotificationProcessor, MaintNotificationsConfig: &maintnotifications.Config{ Mode: maintnotifications.ModeDisabled, @@ -276,26 +306,31 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions { ReadBufferSize: opt.ReadBufferSize, WriteBufferSize: opt.WriteBufferSize, - DialTimeout: opt.DialTimeout, - ReadTimeout: opt.ReadTimeout, - WriteTimeout: opt.WriteTimeout, + DialTimeout: opt.DialTimeout, + DialerRetries: opt.DialerRetries, + DialerRetryTimeout: opt.DialerRetryTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + ContextTimeoutEnabled: opt.ContextTimeoutEnabled, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - MinIdleConns: opt.MinIdleConns, - MaxIdleConns: opt.MaxIdleConns, - MaxActiveConns: opt.MaxActiveConns, - ConnMaxIdleTime: opt.ConnMaxIdleTime, - ConnMaxLifetime: opt.ConnMaxLifetime, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + MaxConcurrentDials: opt.MaxConcurrentDials, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + MaxActiveConns: opt.MaxActiveConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, TLSConfig: opt.TLSConfig, - DisableIdentity: opt.DisableIdentity, - DisableIndentity: opt.DisableIndentity, - IdentitySuffix: opt.IdentitySuffix, - FailingTimeoutSeconds: opt.FailingTimeoutSeconds, + DisableIdentity: opt.DisableIdentity, + DisableIndentity: opt.DisableIndentity, + IdentitySuffix: opt.IdentitySuffix, + FailingTimeoutSeconds: opt.FailingTimeoutSeconds, + PushNotificationProcessor: opt.PushNotificationProcessor, MaintNotificationsConfig: &maintnotifications.Config{ Mode: maintnotifications.ModeDisabled, @@ -399,11 +434,14 @@ func setupFailoverConnParams(u *url.URL, o *FailoverOptions) (*FailoverOptions, o.MinRetryBackoff = q.duration("min_retry_backoff") o.MaxRetryBackoff = q.duration("max_retry_backoff") o.DialTimeout = q.duration("dial_timeout") + o.DialerRetries = q.int("dialer_retries") + o.DialerRetryTimeout = q.duration("dialer_retry_timeout") o.ReadTimeout = q.duration("read_timeout") o.WriteTimeout = q.duration("write_timeout") o.ContextTimeoutEnabled = q.bool("context_timeout_enabled") o.PoolFIFO = q.bool("pool_fifo") o.PoolSize = q.int("pool_size") + o.MaxConcurrentDials = q.int("max_concurrent_dials") o.MinIdleConns = q.int("min_idle_conns") o.MaxIdleConns = q.int("max_idle_conns") o.MaxActiveConns = q.int("max_active_conns") diff --git a/universal.go b/universal.go index 1dc9764dc7..39acdb8e5e 100644 --- a/universal.go +++ b/universal.go @@ -8,6 +8,7 @@ import ( "github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/maintnotifications" + "github.com/redis/go-redis/v9/push" ) // UniversalOptions information is required by UniversalClient to establish @@ -57,7 +58,18 @@ type UniversalOptions struct { MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - DialTimeout time.Duration + DialTimeout time.Duration + + // DialerRetries is the maximum number of retry attempts when dialing fails. + // + // default: 5 + DialerRetries int + + // DialerRetryTimeout is the backoff duration between retry attempts. + // + // default: 100 milliseconds + DialerRetryTimeout time.Duration + ReadTimeout time.Duration WriteTimeout time.Duration ContextTimeoutEnabled bool @@ -79,7 +91,12 @@ type UniversalOptions struct { // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). PoolFIFO bool - PoolSize int + PoolSize int + + // MaxConcurrentDials is the maximum number of concurrent connection creation goroutines. + // If <= 0, defaults to PoolSize. If > PoolSize, it will be capped at PoolSize. + MaxConcurrentDials int + PoolTimeout time.Duration MinIdleConns int MaxIdleConns int @@ -121,6 +138,10 @@ type UniversalOptions struct { UnstableResp3 bool + // PushNotificationProcessor is the processor for handling push notifications. + // If nil, a default processor will be created for RESP3 connections. + PushNotificationProcessor push.NotificationProcessor + // IsClusterMode can be used when only one Addrs is provided (e.g. Elasticache supports setting up cluster mode with configuration endpoint). IsClusterMode bool @@ -156,32 +177,36 @@ func (o *UniversalOptions) Cluster() *ClusterOptions { MinRetryBackoff: o.MinRetryBackoff, MaxRetryBackoff: o.MaxRetryBackoff, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, + DialTimeout: o.DialTimeout, + DialerRetries: o.DialerRetries, + DialerRetryTimeout: o.DialerRetryTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + ContextTimeoutEnabled: o.ContextTimeoutEnabled, ReadBufferSize: o.ReadBufferSize, WriteBufferSize: o.WriteBufferSize, - PoolFIFO: o.PoolFIFO, - - PoolSize: o.PoolSize, - PoolTimeout: o.PoolTimeout, - MinIdleConns: o.MinIdleConns, - MaxIdleConns: o.MaxIdleConns, - MaxActiveConns: o.MaxActiveConns, - ConnMaxIdleTime: o.ConnMaxIdleTime, - ConnMaxLifetime: o.ConnMaxLifetime, + PoolFIFO: o.PoolFIFO, + PoolSize: o.PoolSize, + MaxConcurrentDials: o.MaxConcurrentDials, + PoolTimeout: o.PoolTimeout, + MinIdleConns: o.MinIdleConns, + MaxIdleConns: o.MaxIdleConns, + MaxActiveConns: o.MaxActiveConns, + ConnMaxIdleTime: o.ConnMaxIdleTime, + ConnMaxLifetime: o.ConnMaxLifetime, TLSConfig: o.TLSConfig, - DisableIdentity: o.DisableIdentity, - DisableIndentity: o.DisableIndentity, - IdentitySuffix: o.IdentitySuffix, - FailingTimeoutSeconds: o.FailingTimeoutSeconds, - UnstableResp3: o.UnstableResp3, - MaintNotificationsConfig: o.MaintNotificationsConfig, + DisableIdentity: o.DisableIdentity, + DisableIndentity: o.DisableIndentity, + IdentitySuffix: o.IdentitySuffix, + FailingTimeoutSeconds: o.FailingTimeoutSeconds, + UnstableResp3: o.UnstableResp3, + PushNotificationProcessor: o.PushNotificationProcessor, + MaintNotificationsConfig: o.MaintNotificationsConfig, } } @@ -217,31 +242,36 @@ func (o *UniversalOptions) Failover() *FailoverOptions { MinRetryBackoff: o.MinRetryBackoff, MaxRetryBackoff: o.MaxRetryBackoff, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, + DialTimeout: o.DialTimeout, + DialerRetries: o.DialerRetries, + DialerRetryTimeout: o.DialerRetryTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + ContextTimeoutEnabled: o.ContextTimeoutEnabled, ReadBufferSize: o.ReadBufferSize, WriteBufferSize: o.WriteBufferSize, - PoolFIFO: o.PoolFIFO, - PoolSize: o.PoolSize, - PoolTimeout: o.PoolTimeout, - MinIdleConns: o.MinIdleConns, - MaxIdleConns: o.MaxIdleConns, - MaxActiveConns: o.MaxActiveConns, - ConnMaxIdleTime: o.ConnMaxIdleTime, - ConnMaxLifetime: o.ConnMaxLifetime, + PoolFIFO: o.PoolFIFO, + PoolSize: o.PoolSize, + MaxConcurrentDials: o.MaxConcurrentDials, + PoolTimeout: o.PoolTimeout, + MinIdleConns: o.MinIdleConns, + MaxIdleConns: o.MaxIdleConns, + MaxActiveConns: o.MaxActiveConns, + ConnMaxIdleTime: o.ConnMaxIdleTime, + ConnMaxLifetime: o.ConnMaxLifetime, TLSConfig: o.TLSConfig, ReplicaOnly: o.ReadOnly, - DisableIdentity: o.DisableIdentity, - DisableIndentity: o.DisableIndentity, - IdentitySuffix: o.IdentitySuffix, - UnstableResp3: o.UnstableResp3, + DisableIdentity: o.DisableIdentity, + DisableIndentity: o.DisableIndentity, + IdentitySuffix: o.IdentitySuffix, + UnstableResp3: o.UnstableResp3, + PushNotificationProcessor: o.PushNotificationProcessor, // Note: MaintNotificationsConfig not supported for FailoverOptions } } @@ -271,30 +301,35 @@ func (o *UniversalOptions) Simple() *Options { MinRetryBackoff: o.MinRetryBackoff, MaxRetryBackoff: o.MaxRetryBackoff, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, + DialTimeout: o.DialTimeout, + DialerRetries: o.DialerRetries, + DialerRetryTimeout: o.DialerRetryTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + ContextTimeoutEnabled: o.ContextTimeoutEnabled, ReadBufferSize: o.ReadBufferSize, WriteBufferSize: o.WriteBufferSize, - PoolFIFO: o.PoolFIFO, - PoolSize: o.PoolSize, - PoolTimeout: o.PoolTimeout, - MinIdleConns: o.MinIdleConns, - MaxIdleConns: o.MaxIdleConns, - MaxActiveConns: o.MaxActiveConns, - ConnMaxIdleTime: o.ConnMaxIdleTime, - ConnMaxLifetime: o.ConnMaxLifetime, + PoolFIFO: o.PoolFIFO, + PoolSize: o.PoolSize, + MaxConcurrentDials: o.MaxConcurrentDials, + PoolTimeout: o.PoolTimeout, + MinIdleConns: o.MinIdleConns, + MaxIdleConns: o.MaxIdleConns, + MaxActiveConns: o.MaxActiveConns, + ConnMaxIdleTime: o.ConnMaxIdleTime, + ConnMaxLifetime: o.ConnMaxLifetime, TLSConfig: o.TLSConfig, - DisableIdentity: o.DisableIdentity, - DisableIndentity: o.DisableIndentity, - IdentitySuffix: o.IdentitySuffix, - UnstableResp3: o.UnstableResp3, - MaintNotificationsConfig: o.MaintNotificationsConfig, + DisableIdentity: o.DisableIdentity, + DisableIndentity: o.DisableIndentity, + IdentitySuffix: o.IdentitySuffix, + UnstableResp3: o.UnstableResp3, + PushNotificationProcessor: o.PushNotificationProcessor, + MaintNotificationsConfig: o.MaintNotificationsConfig, } } From fc05874a2731716a07cdcf4f5f9d7689f8e22216 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com> Date: Tue, 2 Dec 2025 16:00:40 +0200 Subject: [PATCH 3/4] Update osscluster_lazy_reload_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- osscluster_lazy_reload_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osscluster_lazy_reload_test.go b/osscluster_lazy_reload_test.go index f66bb424ae..994fd40e74 100644 --- a/osscluster_lazy_reload_test.go +++ b/osscluster_lazy_reload_test.go @@ -170,7 +170,7 @@ func TestLazyReloadQueueBehavior(t *testing.T) { } }) - t.Run("CascadingSMigratedScenario", func(t *testing.T) { + t.Run("CascadingSMIGRATEDScenario", func(t *testing.T) { // Simulate the real-world scenario: multiple SMIGRATED notifications // arriving in quick succession from different node clients var reloadCount atomic.Int32 From f3bab0dc750256ffc463185ad9720198cdf3ae88 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com> Date: Tue, 2 Dec 2025 16:01:26 +0200 Subject: [PATCH 4/4] Update osscluster.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- osscluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/osscluster.go b/osscluster.go index 86f5994d7f..19b915c648 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1056,6 +1056,7 @@ func (c *clusterStateHolder) LazyReload() { for { _, err := c.Reload(context.Background()) if err != nil { + atomic.StoreUint32(&c.reloadPending, 0) atomic.StoreUint32(&c.reloading, 0) return }