diff --git a/src/StackExchange.Redis/CommandRetry/DefaultRetry.cs b/src/StackExchange.Redis/CommandRetry/DefaultRetry.cs
deleted file mode 100644
index 0dd74eb03..000000000
--- a/src/StackExchange.Redis/CommandRetry/DefaultRetry.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace StackExchange.Redis
-{
- ///
- /// Command Policy to have all commands being retried for connection exception
- ///
- public class AlwaysRetryOnConnectionException : ICommandRetryPolicy
- {
- ///
- ///
- ///
- ///
- ///
- public bool ShouldRetryOnConnectionException(CommandStatus commandStatus) => true;
- }
-
- ///
- /// Command Policy to have only commands that are not yet sent being retried for connection exception
- ///
- public class RetryIfNotSentOnConnectionException : ICommandRetryPolicy
- {
- ///
- ///
- ///
- ///
- ///
- public bool ShouldRetryOnConnectionException(CommandStatus commandStatus)
- {
- return commandStatus == CommandStatus.WaitingToBeSent;
- }
- }
-
- ///
- /// Command Policy to choose which commands will be retried on a connection exception
- ///
- public class CommandRetryPolicy
- {
- ///
- /// Command Policy to have all commands being retried for connection exception
- ///
- ///
- public ICommandRetryPolicy AlwaysRetryOnConnectionException()
- {
- return new AlwaysRetryOnConnectionException();
- }
-
- ///
- /// Command Policy to have only commands that are not yet sent being retried for connection exception
- ///
- ///
- public ICommandRetryPolicy RetryIfNotSentOnConnectionException()
- {
- return new RetryIfNotSentOnConnectionException();
- }
- }
-}
diff --git a/src/StackExchange.Redis/CommandRetry/ICommandRetry.cs b/src/StackExchange.Redis/CommandRetry/ICommandRetry.cs
deleted file mode 100644
index e9a7f8504..000000000
--- a/src/StackExchange.Redis/CommandRetry/ICommandRetry.cs
+++ /dev/null
@@ -1,22 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace StackExchange.Redis
-{
- ///
- /// interface to implement command retry policy
- ///
- public interface ICommandRetryPolicy
- {
- ///
- /// Called when a message failed due to connection error
- ///
- /// current state of the command
- ///
- public bool ShouldRetryOnConnectionException(CommandStatus commandStatus);
-
- }
-}
diff --git a/src/StackExchange.Redis/CommandRetry/IRetryOnReconnectPolicy.cs b/src/StackExchange.Redis/CommandRetry/IRetryOnReconnectPolicy.cs
new file mode 100644
index 000000000..5abd25dd0
--- /dev/null
+++ b/src/StackExchange.Redis/CommandRetry/IRetryOnReconnectPolicy.cs
@@ -0,0 +1,15 @@
+namespace StackExchange.Redis
+{
+ ///
+ /// Interface for a policy that determines which commands should be retried upon restoration of a lost connection
+ ///
+ public interface IRetryOnReconnectPolicy
+ {
+ ///
+ /// Determines whether a failed command should be retried
+ ///
+ /// Current state of the command
+ /// True to retry the command, otherwise false
+ public bool ShouldRetry(CommandStatus commandStatus);
+ }
+}
diff --git a/src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs b/src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs
index 215f04e92..de70561ab 100644
--- a/src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs
+++ b/src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs
@@ -1,42 +1,40 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
-using System.Text;
using System.Threading.Tasks;
-using StackExchange.Redis;
namespace StackExchange.Redis
{
internal class MessageRetryQueue : IDisposable
- {
- readonly Queue queue = new Queue();
- readonly IMessageRetryHelper messageRetryHelper;
- int? maxRetryQueueLength;
- bool runRetryLoopAsync;
+ {
+ private readonly Queue _queue = new Queue();
+ private readonly IMessageRetryHelper _messageRetryHelper;
+ private readonly int? _maxRetryQueueLength;
+ private readonly bool _runRetryLoopAsync;
internal MessageRetryQueue(IMessageRetryHelper messageRetryHelper, int? maxRetryQueueLength = null, bool runRetryLoopAsync = true)
{
- this.maxRetryQueueLength = maxRetryQueueLength;
- this.runRetryLoopAsync = runRetryLoopAsync;
- this.messageRetryHelper = messageRetryHelper;
+ _maxRetryQueueLength = maxRetryQueueLength;
+ _runRetryLoopAsync = runRetryLoopAsync;
+ _messageRetryHelper = messageRetryHelper;
}
- public int RetryQueueLength => queue.Count;
+ public int RetryQueueLength => _queue.Count;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool TryHandleFailedCommand(Message message)
{
bool wasEmpty;
- lock (queue)
+ lock (_queue)
{
- int count = queue.Count;
- if (maxRetryQueueLength.HasValue && count >= maxRetryQueueLength)
+ int count = _queue.Count;
+ if (_maxRetryQueueLength.HasValue && count >= _maxRetryQueueLength)
{
return false;
}
wasEmpty = count == 0;
- queue.Enqueue(message);
+ _queue.Enqueue(message);
}
if (wasEmpty) StartRetryQueueProcessor();
return true;
@@ -46,13 +44,13 @@ internal bool TryHandleFailedCommand(Message message)
internal void StartRetryQueueProcessor()
{
bool startProcessor = false;
- lock (queue)
+ lock (_queue)
{
- startProcessor = queue.Count > 0;
+ startProcessor = _queue.Count > 0;
}
if (startProcessor)
{
- if (runRetryLoopAsync)
+ if (_runRetryLoopAsync)
{
var task = Task.Run(ProcessRetryQueueAsync);
if (task.IsFaulted)
@@ -67,75 +65,73 @@ internal void StartRetryQueueProcessor()
private async Task ProcessRetryQueueAsync()
{
- Message message = null;
while (true)
{
- message = null;
- Exception failedEndpointex = null;
- lock (queue)
+ Message message = null;
+ Exception failedEndpointException = null;
+
+ lock (_queue)
{
- if (queue.Count == 0) break; // all done
- message = queue.Peek();
+ if (_queue.Count == 0) break; // all done
+ message = _queue.Peek();
try
{
- if (!messageRetryHelper.IsEndpointAvailable(message))
+ if (!_messageRetryHelper.IsEndpointAvailable(message))
{
break;
}
}
catch (Exception ex)
{
- failedEndpointex = ex;
+ failedEndpointException = ex;
}
- message = queue.Dequeue();
+ message = _queue.Dequeue();
}
- if (failedEndpointex != null)
+ if (failedEndpointException != null)
{
- messageRetryHelper.SetExceptionAndComplete(message, failedEndpointex);
+ _messageRetryHelper.SetExceptionAndComplete(message, failedEndpointException);
continue;
}
try
{
- if (messageRetryHelper.HasTimedOut(message))
+ if (_messageRetryHelper.HasTimedOut(message))
{
- RedisTimeoutException ex = messageRetryHelper.GetTimeoutException(message);
- messageRetryHelper.SetExceptionAndComplete(message,ex);
+ var ex = _messageRetryHelper.GetTimeoutException(message);
+ _messageRetryHelper.SetExceptionAndComplete(message, ex);
}
else
{
- if (!await messageRetryHelper.TryResendAsync(message))
+ if (!await _messageRetryHelper.TryResendAsync(message))
{
// this should never happen but just to be safe if connection got dropped again
- messageRetryHelper.SetExceptionAndComplete(message);
+ _messageRetryHelper.SetExceptionAndComplete(message);
}
}
}
catch (Exception ex)
{
- messageRetryHelper.SetExceptionAndComplete(message, ex);
+ _messageRetryHelper.SetExceptionAndComplete(message, ex);
}
}
}
-
-
internal void CheckRetryQueueForTimeouts() // check the head of the backlog queue, consuming anything that looks dead
{
- lock (queue)
+ lock (_queue)
{
var now = Environment.TickCount;
- while (queue.Count != 0)
+ while (_queue.Count != 0)
{
- var message = queue.Peek();
- if (!messageRetryHelper.HasTimedOut(message))
+ var message = _queue.Peek();
+ if (!_messageRetryHelper.HasTimedOut(message))
{
break; // not a timeout - we can stop looking
}
- queue.Dequeue();
- RedisTimeoutException ex = messageRetryHelper.GetTimeoutException(message);
- messageRetryHelper.SetExceptionAndComplete(message,ex);
+ _queue.Dequeue();
+ RedisTimeoutException ex = _messageRetryHelper.GetTimeoutException(message);
+ _messageRetryHelper.SetExceptionAndComplete(message, ex);
}
}
}
@@ -143,27 +139,28 @@ internal void CheckRetryQueueForTimeouts() // check the head of the backlog queu
private void DrainQueue(Exception ex)
{
Message message;
- lock (queue)
+ lock (_queue)
{
- while (queue.Count != 0)
+ while (_queue.Count != 0)
{
- message = queue.Dequeue();
- messageRetryHelper.SetExceptionAndComplete(message, ex);
+ message = _queue.Dequeue();
+ _messageRetryHelper.SetExceptionAndComplete(message, ex);
}
}
}
- private bool disposedValue = false;
+ private bool _disposedValue = false;
protected virtual void Dispose(bool disposing)
{
- if (!disposedValue)
+ if (!_disposedValue)
{
+ _disposedValue = true;
+
if (disposing)
{
- DrainQueue(new Exception("RetryQueue disposed"));
+ DrainQueue(new Exception($"{nameof(MessageRetryQueue)} disposed"));
}
- disposedValue = true;
}
}
diff --git a/src/StackExchange.Redis/CommandRetry/RetryOnReconnect.cs b/src/StackExchange.Redis/CommandRetry/RetryOnReconnect.cs
new file mode 100644
index 000000000..505149ce5
--- /dev/null
+++ b/src/StackExchange.Redis/CommandRetry/RetryOnReconnect.cs
@@ -0,0 +1,39 @@
+using System;
+
+namespace StackExchange.Redis
+{
+ ///
+ /// Command retry policy to determine which commands will be retried after a lost connection is retored
+ ///
+ public class RetryOnReconnect : IRetryOnReconnectPolicy
+ {
+ private readonly Func _shouldRetry;
+
+ internal RetryOnReconnect(Func shouldRetry)
+ {
+ _shouldRetry = shouldRetry;
+ }
+
+ ///
+ /// Retry all commands
+ ///
+ /// An instance of a retry policy that retries all commands
+ public static IRetryOnReconnectPolicy Always
+ => new RetryOnReconnect(commandStatus => true);
+
+ ///
+ /// Retry only commands which fail before being sent to the server
+ ///
+ /// An instance of a policy that retries only unsent commands
+ public static IRetryOnReconnectPolicy IfNotSent
+ => new RetryOnReconnect(commandStatus => commandStatus == CommandStatus.WaitingToBeSent);
+
+ ///
+ /// Determines whether to retry a command upon restoration of a lost connection
+ ///
+ /// Status of the command
+ /// True to retry the command, otherwise false
+ public bool ShouldRetry(CommandStatus commandStatus)
+ => _shouldRetry.Invoke(commandStatus);
+ }
+}
diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs
index efc24bf59..6d54db8b8 100644
--- a/src/StackExchange.Redis/ConfigurationOptions.cs
+++ b/src/StackExchange.Redis/ConfigurationOptions.cs
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
-using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
@@ -59,18 +58,18 @@ internal static SslProtocols ParseSslProtocols(string key, string value)
return tmp;
}
- internal static ICommandRetryPolicy ParseCommandRetryPolicy(string key, string value)
+ internal static IRetryOnReconnectPolicy ParseRetryCommandsOnReconnect(string key, string value)
{
switch (value.ToLower())
{
case "noretry":
return null;
case "alwaysretry":
- return new CommandRetryPolicy().AlwaysRetryOnConnectionException();
- case "retryifnotyetsent":
- return new CommandRetryPolicy().RetryIfNotSentOnConnectionException();
+ return RetryOnReconnect.Always;
+ case "retryifnotsent":
+ return RetryOnReconnect.IfNotSent;
default:
- throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' can be NoRetry, AlwaysRetry or RetryIfNotYetSent ; the value '{value}' is not recognised.");
+ throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' can be NoRetry, AlwaysRetry or RetryIfNotSent; the value '{value}' is not recognised.");
}
}
@@ -107,7 +106,7 @@ internal const string
Version = "version",
WriteBuffer = "writeBuffer",
CheckCertificateRevocation = "checkCertificateRevocation",
- CommandRetryPolicy = "CommandRetryPolicy",
+ RetryCommandsOnReconnect = "RetryCommandsOnReconnect",
RetryQueueLength = "RetryQueueLength";
@@ -139,7 +138,7 @@ internal const string
Version,
WriteBuffer,
CheckCertificateRevocation,
- CommandRetryPolicy,
+ RetryCommandsOnReconnect,
RetryQueueLength,
}.ToDictionary(x => x, StringComparer.OrdinalIgnoreCase);
@@ -359,7 +358,7 @@ public bool PreserveAsyncOrder
///
/// The retry policy to be used for command retries during connection reconnects
///
- public ICommandRetryPolicy CommandRetryPolicy { get; set; }
+ public IRetryOnReconnectPolicy RetryCommandsOnReconnect { get; set; }
///
/// Indicates whether endpoints should be resolved via DNS before connecting.
@@ -496,7 +495,7 @@ public ConfigurationOptions Clone()
ReconnectRetryPolicy = reconnectRetryPolicy,
SslProtocols = SslProtocols,
checkCertificateRevocation = checkCertificateRevocation,
- CommandRetryPolicy = CommandRetryPolicy,
+ RetryCommandsOnReconnect = RetryCommandsOnReconnect,
RetryQueueMaxLength = RetryQueueMaxLength,
};
foreach (var item in EndPoints)
@@ -582,7 +581,7 @@ public string ToString(bool includePassword)
Append(sb, OptionKeys.ConfigCheckSeconds, configCheckSeconds);
Append(sb, OptionKeys.ResponseTimeout, responseTimeout);
Append(sb, OptionKeys.DefaultDatabase, DefaultDatabase);
- Append(sb, OptionKeys.CommandRetryPolicy, CommandRetryPolicy);
+ Append(sb, OptionKeys.RetryCommandsOnReconnect, RetryCommandsOnReconnect);
Append(sb, OptionKeys.RetryQueueLength, retryQueueLength);
commandMap?.AppendDeltas(sb);
return sb.ToString();
@@ -672,7 +671,7 @@ private void Clear()
CertificateValidation = null;
ChannelPrefix = default(RedisChannel);
SocketManager = null;
- CommandRetryPolicy = null;
+ RetryCommandsOnReconnect = null;
}
object ICloneable.Clone() => Clone();
@@ -793,8 +792,8 @@ private void DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.SslProtocols:
SslProtocols = OptionKeys.ParseSslProtocols(key, value);
break;
- case OptionKeys.CommandRetryPolicy:
- CommandRetryPolicy = OptionKeys.ParseCommandRetryPolicy(key, value);
+ case OptionKeys.RetryCommandsOnReconnect:
+ RetryCommandsOnReconnect = OptionKeys.ParseRetryCommandsOnReconnect(key, value);
break;
case OptionKeys.RetryQueueLength:
RetryQueueMaxLength = OptionKeys.ParseInt32(key, value, minValue: 0);
diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs
index 8d907b1b3..35a4276b4 100755
--- a/src/StackExchange.Redis/ConnectionMultiplexer.cs
+++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs
@@ -2804,22 +2804,30 @@ internal Task ExecuteAsyncImpl(Message message, ResultProcessor process
internal bool TryMessageForRetry(Message message, Exception ex)
{
- if (RawConfig.CommandRetryPolicy != null && !message.IsAdmin)
+ if (message.IsAdmin || !message.ShouldRetry())
{
- if (!(ex is RedisConnectionException)) return false;
- if (!message.ShouldRetry()) return false;
- var shouldRetry = message.IsInternalCall ? true : RawConfig.CommandRetryPolicy.ShouldRetryOnConnectionException(message.Status);
- if (shouldRetry&& RetryQueueManager.TryHandleFailedCommand(message))
+ return false;
+ }
+
+ bool shouldRetry = false;
+ if (RawConfig.RetryCommandsOnReconnect != null && ex is RedisConnectionException)
+ {
+ shouldRetry = message.IsInternalCall || RawConfig.RetryCommandsOnReconnect.ShouldRetry(message.Status);
+ }
+
+ if (shouldRetry && RetryQueueManager.TryHandleFailedCommand(message))
+ {
+ // if this message is a new message set the writetime
+ if (message.GetWriteTime() == 0)
{
- // if this message is a new message set the writetime
- if (message.GetWriteTime() == 0)
- {
- message.SetEnqueued(null);
- }
- message.ResetStatusToWaitingToBeSent();
- return true;
+ message.SetEnqueued(null);
}
+
+ message.ResetStatusToWaitingToBeSent();
+
+ return true;
}
+
return false;
}
diff --git a/src/StackExchange.Redis/Enums/CommandFlags.cs b/src/StackExchange.Redis/Enums/CommandFlags.cs
index ee52d8d81..147c3d931 100644
--- a/src/StackExchange.Redis/Enums/CommandFlags.cs
+++ b/src/StackExchange.Redis/Enums/CommandFlags.cs
@@ -43,27 +43,27 @@ public enum CommandFlags
/// This operation should be performed on the replica if it is available, but will be performed on
/// a master if no replicas are available. Suitable for read operations only.
///
- [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(PreferReplica) + " instead.")]
- [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
- PreferSlave = 8,
+ PreferReplica = 8, // note: we're using a 2-bit set here, which [Flags] formatting hates; position is doing the best we can for reasonable outcomes here
///
/// This operation should be performed on the replica if it is available, but will be performed on
/// a master if no replicas are available. Suitable for read operations only.
///
- PreferReplica = 8, // note: we're using a 2-bit set here, which [Flags] formatting hates; position is doing the best we can for reasonable outcomes here
+ [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(PreferReplica) + " instead.")]
+ [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
+ PreferSlave = 8,
///
/// This operation should only be performed on a replica. Suitable for read operations only.
///
- DemandReplica = 12, // note: we're using a 2-bit set here, which [Flags] formatting hates; position is doing the best we can for reasonable outcomes here
+ [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(DemandReplica) + " instead.")]
+ [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
+ DemandSlave = 12,
///
/// This operation should only be performed on a replica. Suitable for read operations only.
///
- [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(DemandReplica) + " instead.")]
- [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
- DemandSlave = 12,
+ DemandReplica = 12, // note: we're using a 2-bit set here, which [Flags] formatting hates; position is doing the best we can for reasonable outcomes here
// 16: reserved for additional "demand/prefer" options
@@ -86,17 +86,17 @@ public enum CommandFlags
// 1024: used for timed-out; never user-specified, so not visible on the public API
///
- /// It's the default option and indicates command is never retried on connection restore
+ /// This operation will not be retried (default)
///
NoRetry = 2048,
///
- /// Indicates that on connection failure this operation will be retried if it was not yet sent
+ /// This operation will be retried if it failed before being sent to the server
///
- RetryIfNotYetSent = 4096,
+ RetryIfNotSent = 4096,
///
- /// Indicates always retry command on connection restore
+ /// This operation will always be retried
///
AlwaysRetry = 8192
}
diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs
index 8429985de..9f549bba3 100644
--- a/src/StackExchange.Redis/Message.cs
+++ b/src/StackExchange.Redis/Message.cs
@@ -96,7 +96,7 @@ internal void SetBacklogState(int position, PhysicalConnection physical)
| CommandFlags.NoScriptCache
| CommandFlags.AlwaysRetry
| CommandFlags.NoRetry
- | CommandFlags.RetryIfNotYetSent;
+ | CommandFlags.RetryIfNotSent;
private IResultBox resultBox;
private ResultProcessor resultProcessor;
@@ -667,13 +667,13 @@ internal bool TrySetResult(T value)
///
- /// returns true if message should be retried based on command flag
+ /// Determines whether the message should be retried
///
- ///
+ /// True if the message should be retried based on the command flag, otherwise false
internal bool ShouldRetry()
{
if ((Flags & CommandFlags.NoRetry) != 0) return false;
- if ((Flags & CommandFlags.RetryIfNotYetSent) != 0 && Status == CommandStatus.Sent) return false;
+ if ((Flags & CommandFlags.RetryIfNotSent) != 0 && Status == CommandStatus.Sent) return false;
return true;
}
diff --git a/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryQueueManagerIntegrationTests.cs b/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryQueueManagerIntegrationTests.cs
index ce94b836a..ed43765e8 100644
--- a/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryQueueManagerIntegrationTests.cs
+++ b/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryQueueManagerIntegrationTests.cs
@@ -1,4 +1,5 @@
-using System.Linq;
+using System;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
@@ -13,77 +14,77 @@ public class MessageRetryQueueIntegrationTests
[InlineData(false)]
public async Task RetryAsyncMessageIntegration(bool retryPolicySet)
{
- string keyname = "testretrypolicy";
ConfigurationOptions configAdmin = new ConfigurationOptions();
configAdmin.EndPoints.Add("127.0.0.1");
+ configAdmin.AbortOnConnectFail = false;
configAdmin.AllowAdmin = true;
+ ConfigurationOptions configClient = new ConfigurationOptions();
+ configClient.EndPoints.Add("127.0.0.1");
+ configAdmin.AbortOnConnectFail = false;
+ configClient.RetryCommandsOnReconnect = retryPolicySet ? RetryOnReconnect.Always : null;
+
using (var adminMuxer = ConnectionMultiplexer.Connect(configAdmin))
+ using (var clientmuxer = ConnectionMultiplexer.Connect(configClient))
{
- ConfigurationOptions configClient = new ConfigurationOptions();
- configClient.EndPoints.Add("127.0.0.1");
- configClient.CommandRetryPolicy = null;
- if (retryPolicySet) configClient.CommandRetryPolicy = new CommandRetryPolicy().AlwaysRetryOnConnectionException();
- using (var clientmuxer = ConnectionMultiplexer.Connect(configClient))
+ var conn = clientmuxer.GetDatabase();
+ const string keyname = "testretrypolicy";
+ long count = 0;
+
+ var runLoad = Task.Run(() =>
{
- var conn = clientmuxer.GetDatabase();
- bool stop = false;
- long count = 0;
- var runLoad = Task.Run(async () =>
+ try
{
- try
+ using (var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)))
{
- int paralleltasks = 200;
+ var cancellationToken = cancellationTokenSource.Token;
+ int parallelTaskCount = 200;
+
while (true)
- {
- Task[] tasks = new Task[paralleltasks];
- for (int i = 0; i < paralleltasks; i++)
+ {
+ Task[] tasks = new Task[parallelTaskCount];
+ for (int i = 0; i < parallelTaskCount; i++)
{
tasks[i] = conn.StringSetBitAsync(keyname, count, true);
Interlocked.Increment(ref count);
}
- await Task.WhenAll(tasks);
- if (stop) break;
+ Task.WaitAll(tasks, cancellationToken);
}
}
- catch
- {
- return false;
- }
+ }
+ catch (OperationCanceledException)
+ {
return true;
- });
-
-
- // let the load warmup atleast n times before connection blip
- await Task.Delay(2000);
-
- // connection blip
- KillClient(adminMuxer, clientmuxer);
-
- // let the load run atleast n times during connection blip
- await Task.Delay(10000);
-
- stop = true;
+ }
+ catch (Exception ex)
+ {
+ Assert.False(retryPolicySet, ex.ToString());
+ return false;
+ }
+ });
- // wait for load to stop
- var isLoadSucceed = await runLoad;
+ // let the load warmup at least n times before connection blip
+ await Task.Delay(2000);
+ // connection blip
+ KillClient(adminMuxer, clientmuxer);
- // Assert load completed based on policy
- Assert.Equal(retryPolicySet, isLoadSucceed);
+ // wait for load to stop
+ var isLoadSucceed = await runLoad;
- // Assert for retrypolicy data was set correctly after retry
- if (retryPolicySet)
- {
- Assert.Equal(Interlocked.Read(ref count), await conn.StringBitCountAsync(keyname));
- }
+ // Assert load completed based on policy
+ Assert.Equal(retryPolicySet, isLoadSucceed);
- // cleanup
- await adminMuxer.GetDatabase().KeyDeleteAsync(keyname);
+ // Assert for retrypolicy data was set correctly after retry
+ if (retryPolicySet)
+ {
+ Assert.Equal(Interlocked.Read(ref count), await conn.StringBitCountAsync(keyname));
}
+
+ // cleanup
+ await adminMuxer.GetDatabase().KeyDeleteAsync(keyname);
}
}
-
private void KillClient(IInternalConnectionMultiplexer adminMuxer, IInternalConnectionMultiplexer clientmuxer)
{
string clientname = "retrypolicy";
diff --git a/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryQueueManagerTests.cs b/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryQueueManagerTests.cs
index c5d4dfe99..1992b12a5 100644
--- a/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryQueueManagerTests.cs
+++ b/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryQueueManagerTests.cs
@@ -24,7 +24,7 @@ public void ValidateMaxQueueLengthFails()
var message = Message.Create(0, CommandFlags.None, RedisCommand.SET);
var mux = new SharedConnectionFixture().Connection;
var mockmessageRetryHelper = new Mock();
- var messageRetryQueue = new MessageRetryQueue(mockmessageRetryHelper.Object, maxRetryQueueLength:0, runRetryLoopAsync: false);
+ var messageRetryQueue = new MessageRetryQueue(mockmessageRetryHelper.Object, maxRetryQueueLength: 0, runRetryLoopAsync: false);
var isEnqueuedWithZeroMaxLength = messageRetryQueue.TryHandleFailedCommand(message);
messageRetryQueue.TryHandleFailedCommand(message);
@@ -34,7 +34,7 @@ public void ValidateMaxQueueLengthFails()
[Fact]
public async void RetryMessageSucceeds()
{
- using (var muxer = Create(allowAdmin:true,retryPolicy: new CommandRetryPolicy().AlwaysRetryOnConnectionException()))
+ using (var muxer = Create(allowAdmin: true, retryPolicy: RetryOnReconnect.Always))
{
var conn = muxer.GetDatabase();
var duration = await conn.PingAsync().ForAwait();
@@ -82,7 +82,7 @@ public void TryHandleFailedMessageTimedoutEndPointAvailable()
Assert.True(messageRetryQueue.RetryQueueLength == 0);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
- messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message,timeout), Times.Once);
+ messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, timeout), Times.Once);
}
[Fact]
@@ -96,7 +96,7 @@ public void TryHandleFailedMessageGetEndpointThrows()
Assert.True(messageRetryQueue.RetryQueueLength == 0);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
- messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message,ex), Times.Once);
+ messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, ex), Times.Once);
}
[Fact]
@@ -127,7 +127,7 @@ public void CheckRetryForTimeoutTimesout(bool hasTimedout, int queueLength)
Assert.Equal(queueLength, messageRetryQueue.RetryQueueLength);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
- messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message,It.IsAny()), Times.Exactly(hasTimedout ? 1 : 0));
+ messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, It.IsAny()), Times.Exactly(hasTimedout ? 1 : 0));
}
[Fact]
diff --git a/tests/StackExchange.Redis.Tests/CommandRetry/ConnectionMultiplexerRetryTests.cs b/tests/StackExchange.Redis.Tests/CommandRetry/ConnectionMultiplexerRetryTests.cs
index aa875eb44..5aae0450d 100644
--- a/tests/StackExchange.Redis.Tests/CommandRetry/ConnectionMultiplexerRetryTests.cs
+++ b/tests/StackExchange.Redis.Tests/CommandRetry/ConnectionMultiplexerRetryTests.cs
@@ -7,7 +7,7 @@ public class ConnectionMultiplexerRetryTests
[Theory]
[InlineData(CommandFlags.AlwaysRetry, true)]
[InlineData(CommandFlags.NoRetry, false)]
- [InlineData(CommandFlags.RetryIfNotYetSent, true)]
+ [InlineData(CommandFlags.RetryIfNotSent, true)]
public void ValidateOverrideFlag(CommandFlags flag, bool shouldRetry)
{
var message = Message.Create(0, flag, RedisCommand.GET);
@@ -19,7 +19,7 @@ public void ValidateOverrideFlag(CommandFlags flag, bool shouldRetry)
[InlineData(false, true)]
public void ValidateRetryIfNotSentOverrideFlag(bool alreadySent, bool shouldRetry)
{
- var message = Message.Create(0, CommandFlags.RetryIfNotYetSent, RedisCommand.GET);
+ var message = Message.Create(0, CommandFlags.RetryIfNotSent, RedisCommand.GET);
if (alreadySent)
{
message.SetRequestSent();
diff --git a/tests/StackExchange.Redis.Tests/TestBase.cs b/tests/StackExchange.Redis.Tests/TestBase.cs
index 5dcef2af7..6477d4e8c 100644
--- a/tests/StackExchange.Redis.Tests/TestBase.cs
+++ b/tests/StackExchange.Redis.Tests/TestBase.cs
@@ -229,7 +229,7 @@ internal virtual IInternalConnectionMultiplexer Create(
bool checkConnect = true, string failMessage = null,
string channelPrefix = null, Proxy? proxy = null,
string configuration = null, bool logTransactionData = true,
- bool shared = true, int? defaultDatabase = null, ICommandRetryPolicy retryPolicy = null,
+ bool shared = true, int? defaultDatabase = null, IRetryOnReconnectPolicy retryPolicy = null,
[CallerMemberName] string caller = null)
{
if (Output == null)
@@ -270,7 +270,7 @@ public static ConnectionMultiplexer CreateDefault(
string channelPrefix = null, Proxy? proxy = null,
string configuration = null, bool logTransactionData = true,
int? defaultDatabase = null,
- ICommandRetryPolicy retryPolicy = null,
+ IRetryOnReconnectPolicy retryPolicy = null,
[CallerMemberName] string caller = null)
{
StringWriter localLog = null;
@@ -306,7 +306,7 @@ public static ConnectionMultiplexer CreateDefault(
if (connectTimeout != null) config.ConnectTimeout = connectTimeout.Value;
if (proxy != null) config.Proxy = proxy.Value;
if (defaultDatabase != null) config.DefaultDatabase = defaultDatabase.Value;
- if (retryPolicy != null) config.CommandRetryPolicy = retryPolicy;
+ if (retryPolicy != null) config.RetryCommandsOnReconnect = retryPolicy;
var watch = Stopwatch.StartNew();
var task = ConnectionMultiplexer.ConnectAsync(config, log);
if (!task.Wait(config.ConnectTimeout >= (int.MaxValue / 2) ? int.MaxValue : config.ConnectTimeout * 2))