diff --git a/src/StackExchange.Redis/AssemblyInfoHack.cs b/src/StackExchange.Redis/AssemblyInfoHack.cs index 2936ed56c..ac5ce696c 100644 --- a/src/StackExchange.Redis/AssemblyInfoHack.cs +++ b/src/StackExchange.Redis/AssemblyInfoHack.cs @@ -6,5 +6,6 @@ [assembly: InternalsVisibleTo("StackExchange.Redis.Server, PublicKey=00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff")] [assembly: InternalsVisibleTo("StackExchange.Redis.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff")] [assembly: InternalsVisibleTo("NRediSearch.Test, PublicKey=00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff")] - +// For mocking in tests +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")] [assembly: CLSCompliant(true)] diff --git a/src/StackExchange.Redis/CommandRetry/CommandFailureReason.cs b/src/StackExchange.Redis/CommandRetry/CommandFailureReason.cs new file mode 100644 index 000000000..b72846995 --- /dev/null +++ b/src/StackExchange.Redis/CommandRetry/CommandFailureReason.cs @@ -0,0 +1,25 @@ +namespace StackExchange.Redis +{ + /// + /// The reason a command failed to send or complete. + /// + public enum CommandFailureReason + { + /// + /// No open/valid connection was avaialble to send on - we couldn't even write the command. + /// + WriteFailure, + /// + /// The message was sent, but we lost the connection and this command in-flight. + /// + ConnectionFailure, + /// + /// Command has timed out, exceeding the sync or async timeout limits + /// + Timeout, + /// + /// This command failed again, during a retry + /// + RetryFailure, + } +} diff --git a/src/StackExchange.Redis/CommandRetry/CommandRetryPolicy.cs b/src/StackExchange.Redis/CommandRetry/CommandRetryPolicy.cs new file mode 100644 index 000000000..a46f313fd --- /dev/null +++ b/src/StackExchange.Redis/CommandRetry/CommandRetryPolicy.cs @@ -0,0 +1,99 @@ +using System; + +namespace StackExchange.Redis +{ + /// + /// Policy that determines which commands should be retried upon restoration of a lost connection. + /// + public abstract class CommandRetryPolicy + { + /// + /// Creates a policy instance for a specific multiplexer and its commands. + /// + /// The muleiplexer this policy is for. + protected CommandRetryPolicy(ConnectionMultiplexer muxer) { } + + /// + /// Returns the current length of the retry queue. + /// + public abstract int CurrentQueueLength { get; } + + /// + /// Returns whether the current queue is processing (e.g. retrying queued commands). + /// + public abstract bool CurrentlyProcessing { get; } + + /// + /// Returns the status of the retry mechanism, e.g. what the queue is doing. + /// + public abstract string StatusDescription { get; } + + /// + /// Determines if a message is eligible for retrying at all. + /// + /// The message to check eligibility for. + /// True if a message is eligible. + internal static bool IsEligible(Message message) + { + if ((message.Flags & CommandFlags.NoRetry) != 0 + || ((message.Flags & CommandFlags.RetryIfNotSent) != 0 && message.Status == CommandStatus.Sent) + || message.IsAdmin + || message.IsInternalCall) + { + return false; + } + + return true; + } + + /// + /// Determines if an xception is eligible for retrying at all. + /// + /// The exception to check eligibility for. + /// True if an exception is eligible. + internal static bool IsEligible(Exception exception) => exception is RedisException; + + /// + /// Tries to queue an eligible command. + /// Protected because this isn't called directly - eligibility (above) is checked first by the multiplexer. + /// + protected internal abstract bool TryQueue(FailedCommand command); + + /// + /// Called when a heartbeat occurs. + /// + public abstract void OnHeartbeat(); + + /// + /// Called when a multiplexer reconnects. + /// + public abstract void OnReconnect(); + + /// + /// Default policy - retry only commands which fail before being sent to the server (alias for ). + /// + /// An instance of a policy that retries only unsent commands. + public static Func Default => IfNotSent; + + /// + /// Retry all commands. + /// + /// An instance of a retry policy that retries all commands. + public static Func Always + => mutex => new DefaultCommandRetryPolicy(mutex, commandStatus => true); + + /// + /// Retry only commands which fail before being sent to the server. + /// + /// An instance of a policy that retries only unsent commands. + public static Func IfNotSent + => mutex => new DefaultCommandRetryPolicy(mutex, command => command.Status == CommandStatus.WaitingToBeSent); + + /// + /// Never retry a command. + /// + /// An instance of a retry policy that retries no commands. + public static Func Never + => mutex => new NeverCommandRetryPolicy(mutex); + } +} diff --git a/src/StackExchange.Redis/CommandRetry/DefaultCommandRetryPolicy.cs b/src/StackExchange.Redis/CommandRetry/DefaultCommandRetryPolicy.cs new file mode 100644 index 000000000..53bc5c238 --- /dev/null +++ b/src/StackExchange.Redis/CommandRetry/DefaultCommandRetryPolicy.cs @@ -0,0 +1,81 @@ +using System; + +namespace StackExchange.Redis +{ + /// + /// Command retry policy to determine which commands will be retried after a lost connection is retored + /// + public class DefaultCommandRetryPolicy : CommandRetryPolicy + { + private MessageRetryQueue RetryQueue { get; } + + private readonly Func _shouldRetry; + + /// + /// Creates a for the given . + /// + /// The to handle retries for. + /// Whether a command should be retried. + protected internal DefaultCommandRetryPolicy(ConnectionMultiplexer muxer, Func shouldRetry) : base(muxer) + { + _shouldRetry = shouldRetry; + var messageRetryHelper = new MessageRetryHelper(muxer); + RetryQueue = new MessageRetryQueue(messageRetryHelper); + } + + /// + /// Gets the current length of the retry queue. + /// + public override int CurrentQueueLength => RetryQueue.CurrentRetryQueueLength; + + /// + /// Returns whether the current queue is processing (e.g. retrying queued commands). + /// + public override bool CurrentlyProcessing => RetryQueue.IsRunning; + + /// + /// Returns whether the current queue is processing (e.g. retrying queued commands). + /// + public override string StatusDescription => RetryQueue.StatusDescription; + + /// + /// Tries to queue a message for retry if possible. + /// + /// The command to tru queueing (contains the message and exception). + /// True if the message was queued. + /// Note that this is internal only - external callers cannot override it to bypass the CanRetry checks. + protected internal override bool TryQueue(FailedCommand command) + { + // Sanity check if we should be trying this one + if (!_shouldRetry.Invoke(command)) + { + return false; + } + + if (RetryQueue.TryHandleFailedCommand(command.Message)) + { + // if this message is a new message set the writetime + if (command.Message.GetWriteTime() == 0) + { + command.Message.SetEnqueued(null); + } + + command.Message.ResetStatusToWaitingToBeSent(); + + return true; + } + + return false; + } + + /// + /// Called on heartbeat, evaluating if anything in queue has timed out and need pruning. + /// + public override void OnHeartbeat() => RetryQueue.CheckRetryQueueForTimeouts(); + + /// + /// Called on a multiplexer reconnect, to start sending anything in the queue. + /// + public override void OnReconnect() => RetryQueue.StartRetryQueueProcessor(); + } +} diff --git a/src/StackExchange.Redis/CommandRetry/FailedCommand.cs b/src/StackExchange.Redis/CommandRetry/FailedCommand.cs new file mode 100644 index 000000000..41d812ff6 --- /dev/null +++ b/src/StackExchange.Redis/CommandRetry/FailedCommand.cs @@ -0,0 +1,45 @@ +using System; + +namespace StackExchange.Redis +{ + /// + /// Command retry policy to act as a no-op for all commands. + /// + public sealed class FailedCommand + { + /// + /// The original/inner message that failed. + /// + internal Message Message; + + /// + /// Status of the command. + /// + public CommandStatus Status => Message.Status; + + /// + /// The redis command sent. + /// + public string CommandAndKey => Message.CommandAndKey; + + /// + /// The reason this command failed, e.g. no connection, timeout, etc. + /// + public CommandFailureReason FailureReason { get; } + + /// + /// The exception that happened to create this failed command. + /// + public Exception Exception { get; } + + internal static FailedCommand FromWriteFail(Message message, Exception exception) => + new FailedCommand(message, CommandFailureReason.WriteFailure, exception); + + internal FailedCommand(Message message, CommandFailureReason reason, Exception exception) + { + Message = message; + FailureReason = reason; + Exception = exception; + } + } +} diff --git a/src/StackExchange.Redis/CommandRetry/IMessageRetryHelper.cs b/src/StackExchange.Redis/CommandRetry/IMessageRetryHelper.cs new file mode 100644 index 000000000..c5fe5e7ca --- /dev/null +++ b/src/StackExchange.Redis/CommandRetry/IMessageRetryHelper.cs @@ -0,0 +1,14 @@ +using System; +using System.Threading.Tasks; + +namespace StackExchange.Redis +{ + internal interface IMessageRetryHelper + { + RedisTimeoutException GetTimeoutException(Message message); + bool HasTimedOut(Message message); + bool IsEndpointAvailable(Message message); + void SetExceptionAndComplete(Message message, Exception ex = null); + Task TryResendAsync(Message message); + } +} \ No newline at end of file diff --git a/src/StackExchange.Redis/CommandRetry/MessageRetryHelper.cs b/src/StackExchange.Redis/CommandRetry/MessageRetryHelper.cs new file mode 100644 index 000000000..a36b1a07b --- /dev/null +++ b/src/StackExchange.Redis/CommandRetry/MessageRetryHelper.cs @@ -0,0 +1,71 @@ +using System; +using System.Text; +using System.Threading.Tasks; + +namespace StackExchange.Redis +{ + internal class MessageRetryHelper : IMessageRetryHelper + { + private readonly IInternalConnectionMultiplexer multiplexer; + + public MessageRetryHelper(IInternalConnectionMultiplexer multiplexer) + { + this.multiplexer = multiplexer; + } + + public bool HasTimedOut(Message message) + { + var timeoutMilliseconds = message.ResultBoxIsAsync ? multiplexer.AsyncTimeoutMilliseconds : multiplexer.TimeoutMilliseconds; + int millisecondsTaken = unchecked(Environment.TickCount - message.GetWriteTime()); + return millisecondsTaken >= timeoutMilliseconds; + } + + /// + /// Gets the timeout exception for a message. + /// + /// The messae to get a message for + /// + /// + /// Not using ExceptionFactory.Timeout as it can cause deadlock while trying to lock writtenawaiting response queue for GetHeadMessages. + /// + public RedisTimeoutException GetTimeoutException(Message message) + { + var sb = new StringBuilder(); + sb.Append("Timeout while waiting for connectionrestore ").Append(message.Command).Append(" (").Append(Format.ToString(multiplexer.TimeoutMilliseconds)).Append("ms)"); + var ex = new RedisTimeoutException(sb.ToString(), message.Status); + return ex; + } + + public bool IsEndpointAvailable(Message message) => multiplexer.SelectServer(message) != null; + + /// + /// Tries to re-issue a . + /// + /// The message to re-send. + /// Whether the write was successful. + public async Task TryResendAsync(Message message) + { + // Use a specific server if one was specified originally, otherwise auto-select + // This is important for things like REPLICAOF we really don't want going to another location + var server = message.SpecificServer ?? multiplexer.SelectServer(message); + if (server != null) + { + var result = await server.TryWriteAsync(message).ForAwait(); + + if (result != WriteResult.Success) + { + var ex = multiplexer.GetException(result, message, server); + SetExceptionAndComplete(message, ex); + } + return true; + } + return false; + } + + public void SetExceptionAndComplete(Message message, Exception ex = null) + { + var inner = new RedisConnectionException(ConnectionFailureType.UnableToConnect, "Failed while retrying on connection restore: " + ex.Message, ex); + message.SetExceptionAndComplete(inner, null, CommandFailureReason.RetryFailure); + } + } +} diff --git a/src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs b/src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs new file mode 100644 index 000000000..954bd80eb --- /dev/null +++ b/src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs @@ -0,0 +1,214 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace StackExchange.Redis +{ + internal class MessageRetryQueue : IDisposable + { + private readonly Queue _queue = new Queue(); + private readonly IMessageRetryHelper _messageRetryHelper; + private readonly int? _maxRetryQueueLength; + private int _isRunning = 0; + private BacklogStatus _backlogStatus = BacklogStatus.Inactive; + + internal enum BacklogStatus : byte + { + Inactive, + Activating, + Starting, + Started, + CheckingForWork, + CheckingForTimeout, + RecordingTimeout, + WritingMessage, + Flushing, + MarkingInactive, + RecordingWriteFailure, + RecordingFault, + SettingIdle, + Faulted, + } + + internal MessageRetryQueue(IMessageRetryHelper messageRetryHelper, int? maxRetryQueueLength = null) + { + _maxRetryQueueLength = maxRetryQueueLength; + _messageRetryHelper = messageRetryHelper; + } + + public int CurrentRetryQueueLength => _queue.Count; + public bool IsRunning => _isRunning == 0; + public string StatusDescription => _backlogStatus.ToString(); + + internal bool TryHandleFailedCommand(Message message) + { + bool wasEmpty; + lock (_queue) + { + int count = _queue.Count; + if (_maxRetryQueueLength.HasValue && count >= _maxRetryQueueLength) + { + return false; + } + wasEmpty = count == 0; + _queue.Enqueue(message); + } + if (wasEmpty) + { + StartRetryQueueProcessor(); + } + return true; + } + + internal void StartRetryQueueProcessor() + { + bool startProcessor = false; + lock (_queue) + { + startProcessor = _queue.Count > 0; + } + if (!startProcessor) + { + return; + } + + if (Interlocked.CompareExchange(ref _isRunning, 1, 0) == 0) + { + _backlogStatus = BacklogStatus.Activating; + // We're explicitly using a thread here because this needs to not be subject to thread pol starvation. + // In the problematic case of sync-of-async thread pool starvation from sync awaiters especially, + // We need to start this queue and flush is out to recover. + var thread = new Thread(s => ((MessageRetryQueue)s).ProcessRetryQueueAsync().RedisFireAndForget()) + { + IsBackground = true, // don't keep process alive + Name = "Redis-MessageRetryQueue" // help anyone looking at thread-dumps + }; + thread.Start(this); + } + } + + internal async Task ProcessRetryQueueAsync() + { + _backlogStatus = BacklogStatus.Starting; + // TODO: Look at exclusive write locks + + try + { + _backlogStatus = BacklogStatus.Started; + while (true) + { + _backlogStatus = BacklogStatus.CheckingForWork; + Message message = null; + Exception failedEndpointException = null; + + lock (_queue) + { + if (_queue.Count == 0) break; // all done + message = _queue.Peek(); + try + { + if (!_messageRetryHelper.IsEndpointAvailable(message)) + { + break; + } + } + catch (Exception ex) + { + failedEndpointException = ex; + } + message = _queue.Dequeue(); + } + + if (failedEndpointException != null) + { + _messageRetryHelper.SetExceptionAndComplete(message, failedEndpointException); + continue; + } + + try + { + _backlogStatus = BacklogStatus.CheckingForTimeout; + if (_messageRetryHelper.HasTimedOut(message)) + { + _backlogStatus = BacklogStatus.RecordingTimeout; + var ex = _messageRetryHelper.GetTimeoutException(message); + _messageRetryHelper.SetExceptionAndComplete(message, ex); + } + else + { + _backlogStatus = BacklogStatus.WritingMessage; + if (!await _messageRetryHelper.TryResendAsync(message)) + { + // this should never happen but just to be safe if connection got dropped again + _messageRetryHelper.SetExceptionAndComplete(message); + } + } + } + catch (Exception ex) + { + _backlogStatus = BacklogStatus.RecordingFault; + _messageRetryHelper.SetExceptionAndComplete(message, ex); + } + } + } + catch + { + _backlogStatus = BacklogStatus.Faulted; + } + finally + { + Interlocked.CompareExchange(ref _isRunning, 0, 1); + _backlogStatus = BacklogStatus.Inactive; + } + } + + internal void CheckRetryQueueForTimeouts() // check the head of the backlog queue, consuming anything that looks dead + { + lock (_queue) + { + while (_queue.Count != 0) + { + var message = _queue.Peek(); + if (!_messageRetryHelper.HasTimedOut(message)) + { + break; // not a timeout - we can stop looking + } + _queue.Dequeue(); + RedisTimeoutException ex = _messageRetryHelper.GetTimeoutException(message); + _messageRetryHelper.SetExceptionAndComplete(message, ex); + } + } + } + + private void DrainQueue(Exception ex) + { + Message message; + lock (_queue) + { + while (_queue.Count != 0) + { + message = _queue.Dequeue(); + _messageRetryHelper.SetExceptionAndComplete(message, ex); + } + } + } + + private bool _disposedValue = false; + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + _disposedValue = true; + + if (disposing) + { + DrainQueue(new Exception($"{nameof(MessageRetryQueue)} disposed")); + } + } + } + + public void Dispose() => Dispose(true); + } +} diff --git a/src/StackExchange.Redis/CommandRetry/NeverCommandRetryPolicy.cs b/src/StackExchange.Redis/CommandRetry/NeverCommandRetryPolicy.cs new file mode 100644 index 000000000..b37365b9d --- /dev/null +++ b/src/StackExchange.Redis/CommandRetry/NeverCommandRetryPolicy.cs @@ -0,0 +1,44 @@ +namespace StackExchange.Redis +{ + /// + /// Command retry policy to act as a no-op for all commands. + /// + public sealed class NeverCommandRetryPolicy : CommandRetryPolicy + { + /// + /// Creates a for the given . + /// + /// The to handle retries for. + internal NeverCommandRetryPolicy(ConnectionMultiplexer muxer) : base(muxer) { } + + /// + /// Gets the current length of the retry queue, always 0. + /// + public override int CurrentQueueLength => 0; + + /// + /// Returns whether the current queue is processing (e.g. retrying queued commands). + /// + public override bool CurrentlyProcessing => false; + + /// + /// Returns idle, since this queue never does anything + /// + public override string StatusDescription => "Idle"; + + /// + /// Doesn't queue anything. + /// + protected internal override bool TryQueue(FailedCommand command) => false; + + /// + /// Called on heartbeat, evaluating if anything in queue has timed out and need pruning. + /// + public override void OnHeartbeat() { } + + /// + /// Called on a multiplexer reconnect, to start sending anything in the queue. + /// + public override void OnReconnect() { } + } +} diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index ad343e598..232bd59c9 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.ComponentModel; -using System.IO; using System.Linq; using System.Net; using System.Net.Security; @@ -13,6 +12,7 @@ namespace StackExchange.Redis { + /// /// The options relevant to a set of redis connections /// @@ -58,6 +58,15 @@ internal static SslProtocols ParseSslProtocols(string key, string value) return tmp; } + internal static Func ParseCommandRetryPolicy(string key, string value) => + value.ToLower() switch + { + "never" => Redis.CommandRetryPolicy.Never, + "always" => Redis.CommandRetryPolicy.Always, + "ifnotsent" => Redis.CommandRetryPolicy.IfNotSent, + _ => throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' can be empty, None, Always or IfNotSent; the value '{value}' is not recognized."), + }; + internal static void Unknown(string key) { throw new ArgumentException($"Keyword '{key}' is not supported.", key); @@ -90,7 +99,9 @@ internal const string TieBreaker = "tiebreaker", Version = "version", WriteBuffer = "writeBuffer", - CheckCertificateRevocation = "checkCertificateRevocation"; + CheckCertificateRevocation = "checkCertificateRevocation", + CommandRetryPolicy = "commandRetryPolicy", + CommandRetryQueueLength = "commandRetryQueueLength"; private static readonly Dictionary normalizedOptions = new[] @@ -120,7 +131,9 @@ internal const string TieBreaker, Version, WriteBuffer, - CheckCertificateRevocation + CheckCertificateRevocation, + CommandRetryPolicy, + CommandRetryQueueLength, }.ToDictionary(x => x, StringComparer.OrdinalIgnoreCase); public static string TryNormalize(string value) @@ -141,7 +154,7 @@ public static string TryNormalize(string value) private Version defaultVersion; - private int? keepAlive, asyncTimeout, syncTimeout, connectTimeout, responseTimeout, writeBuffer, connectRetry, configCheckSeconds; + private int? keepAlive, asyncTimeout, syncTimeout, connectTimeout, responseTimeout, writeBuffer, connectRetry, configCheckSeconds, retryQueueLength; private Proxy? proxy; @@ -332,10 +345,15 @@ public bool PreserveAsyncOrder public Proxy Proxy { get { return proxy.GetValueOrDefault(); } set { proxy = value; } } /// - /// The retry policy to be used for connection reconnects + /// The retry policy to be used for connection reconnects. /// public IReconnectRetryPolicy ReconnectRetryPolicy { get { return reconnectRetryPolicy ??= new LinearRetry(ConnectTimeout); } set { reconnectRetryPolicy = value; } } + /// + /// The retry policy to be used for command retries. By default, unsent commands will be retried. + /// + public Func CommandRetryPolicyGenerator { get; set; } = CommandRetryPolicy.Default; + /// /// Indicates whether endpoints should be resolved via DNS before connecting. /// If enabled the ConnectionMultiplexer will not re-resolve DNS @@ -401,6 +419,11 @@ public bool PreserveAsyncOrder /// public int ConfigCheckSeconds { get { return configCheckSeconds.GetValueOrDefault(60); } set { configCheckSeconds = value; } } + /// + /// If retry policy is specified, Retry Queue max length, by default there's no queue limit + /// + public int? CommandRetryQueueMaxLength { get; set; } + /// /// Parse the configuration from a comma-delimited configuration string /// @@ -466,6 +489,8 @@ public ConfigurationOptions Clone() ReconnectRetryPolicy = reconnectRetryPolicy, SslProtocols = SslProtocols, checkCertificateRevocation = checkCertificateRevocation, + CommandRetryPolicyGenerator = CommandRetryPolicyGenerator, + CommandRetryQueueMaxLength = CommandRetryQueueMaxLength, }; foreach (var item in EndPoints) options.EndPoints.Add(item); @@ -550,6 +575,10 @@ public string ToString(bool includePassword) Append(sb, OptionKeys.ConfigCheckSeconds, configCheckSeconds); Append(sb, OptionKeys.ResponseTimeout, responseTimeout); Append(sb, OptionKeys.DefaultDatabase, DefaultDatabase); + // TODO: How should we handle this? The analog is SocketManager which is left out, but it'd be nice to have this + // if it's one of the built-in ones at least? + //Append(sb, OptionKeys.CommandRetryPolicy, CommandRetryPolicyGenerator); + Append(sb, OptionKeys.CommandRetryQueueLength, retryQueueLength); commandMap?.AppendDeltas(sb); return sb.ToString(); } @@ -627,7 +656,7 @@ private static void Append(StringBuilder sb, string prefix, object value) private void Clear() { ClientName = ServiceName = User = Password = tieBreaker = sslHost = configChannel = null; - keepAlive = syncTimeout = asyncTimeout = connectTimeout = writeBuffer = connectRetry = configCheckSeconds = DefaultDatabase = null; + keepAlive = syncTimeout = asyncTimeout = connectTimeout = writeBuffer = connectRetry = configCheckSeconds = DefaultDatabase = retryQueueLength = null; allowAdmin = abortOnConnectFail = highPrioritySocketThreads = resolveDns = ssl = null; SslProtocols = null; defaultVersion = null; @@ -638,6 +667,7 @@ private void Clear() CertificateValidation = null; ChannelPrefix = default(RedisChannel); SocketManager = null; + CommandRetryPolicyGenerator = CommandRetryPolicy.Default; } object ICloneable.Clone() => Clone(); @@ -758,6 +788,12 @@ private void DoParse(string configuration, bool ignoreUnknown) case OptionKeys.SslProtocols: SslProtocols = OptionKeys.ParseSslProtocols(key, value); break; + case OptionKeys.CommandRetryPolicy: + CommandRetryPolicyGenerator = OptionKeys.ParseCommandRetryPolicy(key, value); + break; + case OptionKeys.CommandRetryQueueLength: + CommandRetryQueueMaxLength = OptionKeys.ParseInt32(key, value, minValue: 0); + break; default: if (!string.IsNullOrEmpty(key) && key[0] == '$') { diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.Retry.cs b/src/StackExchange.Redis/ConnectionMultiplexer.Retry.cs new file mode 100644 index 000000000..7c11ec83e --- /dev/null +++ b/src/StackExchange.Redis/ConnectionMultiplexer.Retry.cs @@ -0,0 +1,30 @@ +using System; + +namespace StackExchange.Redis +{ + public partial class ConnectionMultiplexer + { + internal CommandRetryPolicy CommandRetryPolicy { get; } + + bool IInternalConnectionMultiplexer.RetryQueueIfEligible(Message message, CommandFailureReason reason, Exception exception) + => RetryQueueIfEligible(message, reason, exception); + + /// + /// Tries too queue a command for retry, if it's eligible and the policy says yes. + /// Only called internally from the library, so that base checks cannot be bypassed. + /// + /// The message that failed. + /// The failure reason, e.g. no connection available. + /// The exception throw on the first attempt. + /// True if the command was queued, false otherwise. + internal bool RetryQueueIfEligible(Message message, CommandFailureReason reason, Exception exception) + { + // If we pass the base sanity checks, *then* allocate a FailedCommand for final checks. + var policy = CommandRetryPolicy; + return policy != null + && CommandRetryPolicy.IsEligible(message) + && CommandRetryPolicy.IsEligible(exception) + && policy.TryQueue(new FailedCommand(message, reason, exception)); + } + } +} diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs old mode 100644 new mode 100755 index a68499d4d..4b0d5ae58 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -96,6 +96,11 @@ bool IInternalConnectionMultiplexer.IgnoreConnect set => IgnoreConnect = value; } + int IInternalConnectionMultiplexer.AsyncTimeoutMilliseconds + { + get => AsyncTimeoutMilliseconds; + } + /// /// For debugging: when not enabled, servers cannot connect /// @@ -545,6 +550,7 @@ private static void WriteNormalizingLineEndings(string source, StreamWriter writ } } + /// /// Raised whenever a physical connection fails /// @@ -933,7 +939,6 @@ internal static ConfigurationOptions PrepareConfig(object configuration, bool se } config.SetDefaultPorts(); - return config; } @@ -1044,7 +1049,6 @@ public static ConnectionMultiplexer Connect(ConfigurationOptions configuration, { return SentinelMasterConnect(configuration, log); } - return ConnectImpl(PrepareConfig(configuration), log); } @@ -1305,6 +1309,7 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ConfigurationChangedChannel = Encoding.UTF8.GetBytes(configChannel); } lastHeartbeatTicks = Environment.TickCount; + CommandRetryPolicy = RawConfig.CommandRetryPolicyGenerator?.Invoke(this); } partial void OnCreateReaderWriter(ConfigurationOptions configuration); @@ -1851,7 +1856,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP case ServerType.Cluster: server.ClearUnselectable(UnselectableFlags.ServerType); if (server.IsReplica) - { + { server.ClearUnselectable(UnselectableFlags.RedundantMaster); } else @@ -2009,7 +2014,7 @@ private async Task GetEndpointsFromClusterNodes(ServerEndPoi { serverEndpoint.UpdateNodeRelations(clusterConfig); } - + } return clusterEndpoints; } @@ -2198,6 +2203,8 @@ internal void UpdateClusterRange(ClusterConfiguration configuration) private IDisposable pulse; + ServerEndPoint IInternalConnectionMultiplexer.SelectServer(Message message) => SelectServer(message); + internal ServerEndPoint SelectServer(Message message) { if (message == null) return null; @@ -2233,6 +2240,9 @@ private bool PrepareToPushMessageToBridge(Message message, ResultProcessor } break; } + + // If we were given a specific server, remember it for retry purposes + message.SpecificServer = server; if (!server.IsConnected) { // well, that's no use! @@ -2779,23 +2789,32 @@ internal Task ExecuteAsyncImpl(Message message, ResultProcessor process if (result != WriteResult.Success) { var ex = GetException(result, message, server); - ThrowFailed(tcs, ex); + if (!RetryQueueIfEligible(message, CommandFailureReason.WriteFailure, ex) == true) + { + ThrowFailed(tcs, ex); + } } return tcs.Task; } } - - private static async Task ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @this, ValueTask write, TaskCompletionSource tcs, Message message, ServerEndPoint server) + + private static async Task ExecuteAsyncImpl_Awaited(ConnectionMultiplexer muxer, ValueTask write, TaskCompletionSource tcs, Message message, ServerEndPoint server) { var result = await write.ForAwait(); if (result != WriteResult.Success) { - var ex = @this.GetException(result, message, server); - ThrowFailed(tcs, ex); + var ex = muxer.GetException(result, message, server); + if (!muxer.RetryQueueIfEligible(message, CommandFailureReason.WriteFailure, ex) == true) + { + ThrowFailed(tcs, ex); + } } return tcs == null ? default(T) : await tcs.Task.ForAwait(); } + Exception IInternalConnectionMultiplexer.GetException(WriteResult result, Message message, ServerEndPoint server) + => GetException(result, message, server); + internal Exception GetException(WriteResult result, Message message, ServerEndPoint server) => result switch { WriteResult.Success => null, @@ -2847,7 +2866,11 @@ internal T ExecuteSyncImpl(Message message, ResultProcessor processor, Ser #pragma warning restore CS0618 if (result != WriteResult.Success) { - throw GetException(result, message, server); + var exResult = GetException(result, message, server); + if (!RetryQueueIfEligible(message, CommandFailureReason.WriteFailure, exResult) == true) + { + throw exResult; + } } if (Monitor.Wait(source, TimeoutMilliseconds)) diff --git a/src/StackExchange.Redis/Enums/CommandFlags.cs b/src/StackExchange.Redis/Enums/CommandFlags.cs index 286e19cd6..147c3d931 100644 --- a/src/StackExchange.Redis/Enums/CommandFlags.cs +++ b/src/StackExchange.Redis/Enums/CommandFlags.cs @@ -43,27 +43,27 @@ public enum CommandFlags /// This operation should be performed on the replica if it is available, but will be performed on /// a master if no replicas are available. Suitable for read operations only. /// - [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(PreferReplica) + " instead.")] - [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] - PreferSlave = 8, + PreferReplica = 8, // note: we're using a 2-bit set here, which [Flags] formatting hates; position is doing the best we can for reasonable outcomes here /// /// This operation should be performed on the replica if it is available, but will be performed on /// a master if no replicas are available. Suitable for read operations only. /// - PreferReplica = 8, // note: we're using a 2-bit set here, which [Flags] formatting hates; position is doing the best we can for reasonable outcomes here + [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(PreferReplica) + " instead.")] + [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] + PreferSlave = 8, /// /// This operation should only be performed on a replica. Suitable for read operations only. /// - DemandReplica = 12, // note: we're using a 2-bit set here, which [Flags] formatting hates; position is doing the best we can for reasonable outcomes here + [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(DemandReplica) + " instead.")] + [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] + DemandSlave = 12, /// /// This operation should only be performed on a replica. Suitable for read operations only. /// - [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(DemandReplica) + " instead.")] - [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] - DemandSlave = 12, + DemandReplica = 12, // note: we're using a 2-bit set here, which [Flags] formatting hates; position is doing the best we can for reasonable outcomes here // 16: reserved for additional "demand/prefer" options @@ -84,5 +84,20 @@ public enum CommandFlags NoScriptCache = 512, // 1024: used for timed-out; never user-specified, so not visible on the public API + + /// + /// This operation will not be retried (default) + /// + NoRetry = 2048, + + /// + /// This operation will be retried if it failed before being sent to the server + /// + RetryIfNotSent = 4096, + + /// + /// This operation will always be retried + /// + AlwaysRetry = 8192 } } diff --git a/src/StackExchange.Redis/Enums/CommandStatus.cs b/src/StackExchange.Redis/Enums/CommandStatus.cs index 550e2e8aa..2fb72b984 100644 --- a/src/StackExchange.Redis/Enums/CommandStatus.cs +++ b/src/StackExchange.Redis/Enums/CommandStatus.cs @@ -1,7 +1,7 @@ namespace StackExchange.Redis { /// - /// track status of a command while communicating with Redis + /// Track status of a command while communicating with Redis /// public enum CommandStatus { diff --git a/src/StackExchange.Redis/ExceptionFactory.cs b/src/StackExchange.Redis/ExceptionFactory.cs index 661ce29b8..cb40d1e4d 100644 --- a/src/StackExchange.Redis/ExceptionFactory.cs +++ b/src/StackExchange.Redis/ExceptionFactory.cs @@ -332,6 +332,14 @@ ServerEndPoint server if (toRead >= 0) Add(data, sb, "Inbound-Pipe-Bytes", "in-pipe", toRead.ToString()); if (toWrite >= 0) Add(data, sb, "Outbound-Pipe-Bytes", "out-pipe", toWrite.ToString()); + var retryPolicy = multiplexer.CommandRetryPolicy; + if (retryPolicy != null) + { + Add(data, sb, "RetryPolicy-Queue-Length", "rp-ql", retryPolicy.CurrentQueueLength.ToString()); + Add(data, sb, "RetryPolicy-Processing", "rp-p", retryPolicy.CurrentlyProcessing ? "1" : "0"); + Add(data, sb, "RetryPolicy-Status", "rp-s", retryPolicy.StatusDescription); + } + if (multiplexer.StormLogThreshold >= 0 && qs >= multiplexer.StormLogThreshold && Interlocked.CompareExchange(ref multiplexer.haveStormLog, 1, 0) == 0) { var log = server.GetStormLog(message.Command); @@ -343,6 +351,12 @@ ServerEndPoint server Add(data, sb, "Multiplexer-Connects", "mc", $"{multiplexer._connectAttemptCount}/{multiplexer._connectCompletedCount}/{multiplexer._connectionCloseCount}"); Add(data, sb, "Manager", "mgr", multiplexer.SocketManager?.GetState()); + // If we can, add some thread pool stats to help debugging +#if NETCOREAPP3_1_OR_GREATER + Add(data, sb, "ThreadPool-Work-Item-Pending", "tp-wip", ThreadPool.PendingWorkItemCount.ToString()); + Add(data, sb, "ThreadPool-Thread-Count", "tp-tc", ThreadPool.ThreadCount.ToString()); +#endif + Add(data, sb, "Client-Name", "clientName", multiplexer.ClientName); if (message != null) { diff --git a/src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs b/src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs index f4770e4f8..a4ad2cbab 100644 --- a/src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs @@ -12,7 +12,15 @@ internal interface IInternalConnectionMultiplexer : IConnectionMultiplexer bool IgnoreConnect { get; set; } + internal int AsyncTimeoutMilliseconds { get; } + ReadOnlySpan GetServerSnapshot(); + + ServerEndPoint SelectServer(Message message); + + Exception GetException(WriteResult result, Message message, ServerEndPoint server); + + bool RetryQueueIfEligible(Message message, CommandFailureReason reason, Exception exception); } /// diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index 78e931ce6..78a13f8e9 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -54,6 +54,8 @@ internal abstract class Message : ICompletable { public readonly int Db; + internal void ResetStatusToWaitingToBeSent() => Status = CommandStatus.WaitingToBeSent; + #if DEBUG internal int QueuePosition { get; private set; } internal PhysicalConnection.WriteStatus ConnectionWriteState { get; private set; } @@ -91,7 +93,10 @@ internal void SetBacklogState(int position, PhysicalConnection physical) #pragma warning restore CS0618 | CommandFlags.FireAndForget | CommandFlags.NoRedirect - | CommandFlags.NoScriptCache; + | CommandFlags.NoScriptCache + | CommandFlags.AlwaysRetry + | CommandFlags.NoRetry + | CommandFlags.RetryIfNotSent; private IResultBox resultBox; private ResultProcessor resultProcessor; @@ -100,6 +105,7 @@ internal void SetBacklogState(int position, PhysicalConnection physical) private ProfiledCommand performance; internal DateTime CreatedDateTime; internal long CreatedTimestamp; + internal ServerEndPoint SpecificServer; protected Message(int db, CommandFlags flags, RedisCommand command) { @@ -482,6 +488,7 @@ internal bool ResultBoxIsAsync } } + public object AsyncTimeoutMilliseconds { get; internal set; } internal static Message Create(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisKey[] keys) => keys.Length switch { 0 => new CommandKeyMessage(db, flags, command, key), @@ -619,8 +626,26 @@ internal void Fail(ConnectionFailureType failure, Exception innerException, stri resultProcessor?.ConnectionFail(this, failure, innerException, annotation); } - internal virtual void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge) + internal virtual void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge, CommandFailureReason reason) { + if ((reason == CommandFailureReason.WriteFailure || reason == CommandFailureReason.ConnectionFailure) + && exception is RedisConnectionException + && bridge != null) + { + try + { + if (bridge.Multiplexer.RetryQueueIfEligible(this, reason, exception)) + { + // We're retrying - ABANDON SHIP + return; + } + } + catch (Exception e) + { + exception.Data.Add("OnConnectionRestoreRetryManagerError", e.ToString()); + } + } + resultBox?.SetException(exception); Complete(); } @@ -696,11 +721,9 @@ internal void SetRequestSent() [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void SetWriteTime() { - if ((Flags & NeedsAsyncTimeoutCheckFlag) != 0) - { - _writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that - } + _writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that } + private int _writeTickCount; public int GetWriteTime() => Volatile.Read(ref _writeTickCount); diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index 561bd6e9b..9504c50da 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -450,7 +450,7 @@ private void AbandonPendingBacklog(Exception ex) while (_backlog.TryDequeue(out Message next)) { Multiplexer?.OnMessageFaulted(next, ex); - next.SetExceptionAndComplete(ex, this); + next.SetExceptionAndComplete(ex, this, CommandFailureReason.ConnectionFailure); } } internal void OnFullyEstablished(PhysicalConnection connection, string source) @@ -468,6 +468,8 @@ internal void OnFullyEstablished(PhysicalConnection connection, string source) bool createWorker = !_backlog.IsEmpty; if (createWorker) StartBacklogProcessor(); + Multiplexer.CommandRetryPolicy?.OnReconnect(); + if (ConnectionType == ConnectionType.Interactive) ServerEndPoint.CheckInfoReplication(); } else @@ -484,8 +486,8 @@ internal void OnHeartbeat(bool ifConnectedOnly) bool runThisTime = false; try { + Multiplexer.CommandRetryPolicy?.OnHeartbeat(); CheckBacklogForTimeouts(); - runThisTime = !isDisposed && Interlocked.CompareExchange(ref beating, 1, 0) == 0; if (!runThisTime) return; @@ -816,7 +818,7 @@ private void CheckBacklogForTimeouts() // check the head of the backlog queue, c // Tell the message it has failed // Note: Attempting to *avoid* reentrancy/deadlock issues by not holding the lock while completing messages. var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint); - message.SetExceptionAndComplete(ex, this); + message.SetExceptionAndComplete(ex, this, CommandFailureReason.Timeout); } } internal enum BacklogStatus : byte @@ -896,7 +898,7 @@ private async Task ProcessBacklogAsync() if (maxFlush >= 0) ex.Data["Redis-MaxFlush"] = maxFlush.ToString() + "ms, " + (physical?.MaxFlushBytes ?? -1).ToString(); if (_maxLockDuration >= 0) ex.Data["Redis-MaxLockDuration"] = _maxLockDuration; #endif - message.SetExceptionAndComplete(ex, this); + message.SetExceptionAndComplete(ex, this, CommandFailureReason.Timeout); } else { @@ -1125,7 +1127,7 @@ private async ValueTask CompleteWriteAndReleaseLockAsync(LockToken private WriteResult HandleWriteException(Message message, Exception ex) { var inner = new RedisConnectionException(ConnectionFailureType.InternalFailure, "Failed to write", ex); - message.SetExceptionAndComplete(inner, this); + message.SetExceptionAndComplete(inner, this, CommandFailureReason.WriteFailure); return WriteResult.WriteFailure; } diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index bbbccf4e3..f77996092 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -430,7 +430,7 @@ void add(string lk, string sk, string v) bridge.Trace("Failing: " + next); bridge.Multiplexer?.OnMessageFaulted(next, ex, origin); } - next.SetExceptionAndComplete(ex, bridge); + next.SetExceptionAndComplete(ex, bridge, CommandFailureReason.ConnectionFailure); } } } @@ -612,7 +612,7 @@ internal void OnBridgeHeartbeat() ? $"Timeout awaiting response (outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB, {elapsed}ms elapsed, timeout is {timeout}ms)" : $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server); bridge.Multiplexer?.OnMessageFaulted(msg, timeoutEx); - msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed + msg.SetExceptionAndComplete(timeoutEx, bridge, CommandFailureReason.Timeout); // tell the message that it is doomed bridge.Multiplexer.OnAsyncTimeout(); } // note: it is important that we **do not** remove the message unless we're tearing down the socket; that diff --git a/src/StackExchange.Redis/RedisTransaction.cs b/src/StackExchange.Redis/RedisTransaction.cs index 8bbee7d52..24ce47f7a 100644 --- a/src/StackExchange.Redis/RedisTransaction.cs +++ b/src/StackExchange.Redis/RedisTransaction.cs @@ -197,17 +197,17 @@ public TransactionMessage(int db, CommandFlags flags, List cond this.conditions = (conditions == null || conditions.Count == 0) ? Array.Empty(): conditions.ToArray(); } - internal override void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge) + internal override void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge, CommandFailureReason reason) { var inner = InnerOperations; if (inner != null) { for(int i = 0; i < inner.Length;i++) { - inner[i]?.Wrapped?.SetExceptionAndComplete(exception, bridge); + inner[i]?.Wrapped?.SetExceptionAndComplete(exception, bridge, reason); } } - base.SetExceptionAndComplete(exception, bridge); + base.SetExceptionAndComplete(exception, bridge, reason); } public bool IsAborted => command != RedisCommand.EXEC; diff --git a/tests/StackExchange.Redis.Tests/AsyncTests.cs b/tests/StackExchange.Redis.Tests/AsyncTests.cs index 5367ef0b6..9698b539d 100644 --- a/tests/StackExchange.Redis.Tests/AsyncTests.cs +++ b/tests/StackExchange.Redis.Tests/AsyncTests.cs @@ -19,7 +19,7 @@ public void AsyncTasksReportFailureIfServerUnavailable() { SetExpectedAmbientFailureCount(-1); // this will get messy - using (var conn = Create(allowAdmin: true)) + using (var conn = Create(allowAdmin: true, retryPolicy: CommandRetryPolicy.Never)) { var server = conn.GetServer(TestConfig.Current.MasterServer, TestConfig.Current.MasterPort); diff --git a/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryPolicyTests.cs b/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryPolicyTests.cs new file mode 100644 index 000000000..5075c6fb5 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/CommandRetry/CommandRetryPolicyTests.cs @@ -0,0 +1,154 @@ +using System; +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests.CommandRetry +{ + public class CommandRetryPolicyTests : TestBase + { + public CommandRetryPolicyTests(ITestOutputHelper output) : base(output) { } + + [Theory] + [InlineData(CommandFlags.AlwaysRetry, true)] + [InlineData(CommandFlags.NoRetry, false)] + [InlineData(CommandFlags.RetryIfNotSent, true)] + public void ValidateOverrideFlag(CommandFlags flag, bool shouldRetry) + { + using var muxer = Create(retryPolicy: mux => new DefaultCommandRetryPolicy(mux, c => true)); + var message = Message.Create(0, flag, RedisCommand.GET); + message.ResetStatusToWaitingToBeSent(); + var ex = new RedisConnectionException(ConnectionFailureType.SocketClosed, "test"); + Assert.Equal(shouldRetry, muxer.RetryQueueIfEligible(message, CommandFailureReason.WriteFailure, ex)); + } + + [Theory] + [InlineData(CommandFlags.AlwaysRetry, false)] + [InlineData(CommandFlags.NoRetry, false)] + [InlineData(CommandFlags.RetryIfNotSent, false)] + public void ValidateOverrideFlagWithIsAdmin(CommandFlags flag, bool shouldRetry) + { + using var muxer = Create(retryPolicy: mux => new DefaultCommandRetryPolicy(mux, c => true)); + var message = Message.Create(0, flag, RedisCommand.FLUSHDB); + message.ResetStatusToWaitingToBeSent(); + var ex = new RedisConnectionException(ConnectionFailureType.SocketClosed, "test"); + Assert.Equal(shouldRetry, muxer.RetryQueueIfEligible(message, CommandFailureReason.WriteFailure, ex)); + } + + [Theory] + [InlineData(true, false)] + [InlineData(false, true)] + public void ValidateRetryIfNotSentOverrideFlag(bool alreadySent, bool shouldRetry) + { + using var muxer = Create(retryPolicy: mux => new DefaultCommandRetryPolicy(mux, c => true)); + var message = Message.Create(0, CommandFlags.RetryIfNotSent, RedisCommand.GET); + if (alreadySent) + { + message.SetRequestSent(); + } + else + { + message.ResetStatusToWaitingToBeSent(); + } + var ex = new RedisConnectionException(ConnectionFailureType.SocketClosed, "test"); + Assert.Equal(shouldRetry, muxer.RetryQueueIfEligible(message, CommandFailureReason.WriteFailure, ex)); + } + + [Fact] + public void DefaultPolicy() + { + Assert.Equal(CommandRetryPolicy.IfNotSent, CommandRetryPolicy.Default); + } + + [Fact] + public void NeverPolicy() + { + var policy = CommandRetryPolicy.Never(null); + + var message = new TestMessage(CommandFlags.None, RedisCommand.GET); + var failedCommand = new FailedCommand(message, CommandFailureReason.WriteFailure, new RedisException("test")); + + Assert.False(policy.TryQueue(failedCommand)); + message.SetRequestSent(); + Assert.False(policy.TryQueue(failedCommand)); + } + + [Fact] + public void IfNotSentPolicy() + { + var policy = CommandRetryPolicy.IfNotSent(null); + + var message = new TestMessage(CommandFlags.None, RedisCommand.GET); + var failedCommand = new FailedCommand(message, CommandFailureReason.WriteFailure, new RedisException("test")); + + Assert.True(policy.TryQueue(failedCommand)); + message.SetRequestSent(); + Assert.False(policy.TryQueue(failedCommand)); + // Just for good measure... + message.ResetStatusToWaitingToBeSent(); + Assert.True(policy.TryQueue(failedCommand)); + } + + [Fact] + public void MessageExclusions() + { + // Base eligibility + var message = new TestMessage(CommandFlags.None, RedisCommand.GET); + Assert.True(CommandRetryPolicy.IsEligible(message)); + + // NoRetry excludes the command + message = new TestMessage(CommandFlags.NoRetry, RedisCommand.GET); + Assert.False(CommandRetryPolicy.IsEligible(message)); + + // RetryIfNotSent should work + message = new TestMessage(CommandFlags.RetryIfNotSent, RedisCommand.GET); + Assert.True(CommandRetryPolicy.IsEligible(message)); + //...unless it's sent + message.SetRequestSent(); + Assert.False(CommandRetryPolicy.IsEligible(message)); + + // Admin commands are ineligible + message = new TestMessage(CommandFlags.None, RedisCommand.KEYS); + Assert.True(message.IsAdmin); + Assert.False(CommandRetryPolicy.IsEligible(message)); + + // Internal is ineligible + message = new TestMessage(CommandFlags.None, RedisCommand.GET); + message.SetInternalCall(); + Assert.False(CommandRetryPolicy.IsEligible(message)); + } + + [Fact] + public void ExceptionExclusions() + { + // Sanity checking RedisException - all we look for + var ex = new RedisException("Boom"); + Assert.True(CommandRetryPolicy.IsEligible(ex)); + + // Other exceptions don't qualify + var oex = new Exception("test"); + Assert.False(CommandRetryPolicy.IsEligible(oex)); + } + + [Fact] + public void EndToEndExclusions() + { + using var muxer = Create(retryPolicy: CommandRetryPolicy.Always); + var policy = (muxer as ConnectionMultiplexer).CommandRetryPolicy; + + var ex = new RedisException("test"); + var message = new TestMessage(CommandFlags.None, RedisCommand.GET); + Assert.True(muxer.RetryQueueIfEligible(message, CommandFailureReason.WriteFailure, ex)); + + message = new TestMessage(CommandFlags.NoRetry, RedisCommand.GET); + Assert.False(muxer.RetryQueueIfEligible(message, CommandFailureReason.WriteFailure, ex)); + } + + private class TestMessage : Message + { + public TestMessage(CommandFlags flags, RedisCommand command) : base(0, flags, command) { } + + public override int ArgCount => 0; + protected override void WriteImpl(PhysicalConnection physical) { } + } + } +} diff --git a/tests/StackExchange.Redis.Tests/CommandRetry/MessageRetryQueueIntegrationTests.cs b/tests/StackExchange.Redis.Tests/CommandRetry/MessageRetryQueueIntegrationTests.cs new file mode 100644 index 000000000..8f73875b3 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/CommandRetry/MessageRetryQueueIntegrationTests.cs @@ -0,0 +1,106 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace StackExchange.Redis.Tests.CommandRetry +{ + public class MessageRetryQueueIntegrationTests + { + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task RetryAsyncMessageIntegration(bool retryPolicySet) + { + ConfigurationOptions configAdmin = new ConfigurationOptions(); + configAdmin.EndPoints.Add("127.0.0.1"); + configAdmin.AbortOnConnectFail = false; + configAdmin.AllowAdmin = true; + configAdmin.CommandRetryPolicyGenerator = retryPolicySet ? CommandRetryPolicy.Always : null; + + ConfigurationOptions configClient = new ConfigurationOptions(); + configClient.EndPoints.Add("127.0.0.1"); + configAdmin.AbortOnConnectFail = false; + configClient.CommandRetryPolicyGenerator = retryPolicySet ? CommandRetryPolicy.Always : null; + + using (var adminMuxer = ConnectionMultiplexer.Connect(configAdmin)) + using (var clientmuxer = ConnectionMultiplexer.Connect(configClient)) + { + var conn = clientmuxer.GetDatabase(); + const string keyname = "testretrypolicy"; + long count = 0; + + var runLoad = Task.Run(() => + { + try + { + using (var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10))) + { + var cancellationToken = cancellationTokenSource.Token; + int parallelTaskCount = 200; + + while (true) + { + Task[] tasks = new Task[parallelTaskCount]; + for (int i = 0; i < parallelTaskCount; i++) + { + tasks[i] = conn.StringSetBitAsync(keyname, count, true); + Interlocked.Increment(ref count); + } + Task.WaitAll(tasks, cancellationToken); + } + } + } + catch (OperationCanceledException) + { + return true; + } + catch (Exception ex) + { + Assert.False(retryPolicySet, ex.ToString()); + return false; + } + }); + + // let the load warmup at least n times before connection blip + await Task.Delay(2000); + + // connection blip + KillClient(adminMuxer, clientmuxer); + + // wait for load to stop + var isLoadSucceed = await runLoad; + + // Assert load completed based on policy + Assert.Equal(retryPolicySet, isLoadSucceed); + + // Assert for retrypolicy data was set correctly after retry + if (retryPolicySet) + { + Assert.Equal(Interlocked.Read(ref count), await conn.StringBitCountAsync(keyname)); + } + + // cleanup + await adminMuxer.GetDatabase().KeyDeleteAsync(keyname); + } + } + + private void KillClient(IInternalConnectionMultiplexer adminMuxer, IInternalConnectionMultiplexer clientmuxer) + { + string clientname = "retrypolicy"; + var clientServer = clientmuxer.GetServer(clientmuxer.GetEndPoints()[0]); + clientServer.Execute("client", "setname", clientname); + + var adminServer = adminMuxer.GetServer(adminMuxer.GetEndPoints()[0]); + var clientsToKill = adminServer.ClientList().Where(c => c.Name.Equals(clientname)); + Assert.NotNull(clientsToKill); + foreach (var client in clientsToKill) + { + Assert.Equal(clientname, client.Name); + adminMuxer.GetServer(adminMuxer.GetEndPoints()[0]).ClientKill(client.Address); + } + } + + } +} diff --git a/tests/StackExchange.Redis.Tests/CommandRetry/MessageRetryQueueTests.cs b/tests/StackExchange.Redis.Tests/CommandRetry/MessageRetryQueueTests.cs new file mode 100644 index 000000000..5ac845ac5 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/CommandRetry/MessageRetryQueueTests.cs @@ -0,0 +1,151 @@ +using System; +using System.Threading.Tasks; +using Moq; +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests.CommandRetry +{ + [Collection(SharedConnectionFixture.Key)] + public class MessageRetryQueueTests : TestBase + { + public MessageRetryQueueTests(ITestOutputHelper output, SharedConnectionFixture fixture) : base(output, fixture) { } + + private void GetMock(out Moq.Mock mockmessageRetryHelper, out MessageRetryQueue messageRetryQueue, out Message message) + { + message = Message.Create(0, CommandFlags.None, RedisCommand.SET); + mockmessageRetryHelper = new Mock(); + messageRetryQueue = new MessageRetryQueue(mockmessageRetryHelper.Object); + } + + [Fact] + public void ValidateMaxQueueLengthFails() + { + var message = Message.Create(0, CommandFlags.None, RedisCommand.SET); + var mockmessageRetryHelper = new Mock(); + var messageRetryQueue = new MessageRetryQueue(mockmessageRetryHelper.Object, maxRetryQueueLength: 0); + + var isEnqueuedWithZeroMaxLength = messageRetryQueue.TryHandleFailedCommand(message); + messageRetryQueue.TryHandleFailedCommand(message); + Assert.False(isEnqueuedWithZeroMaxLength); + } + + [Fact] + public async Task RetryMessageSucceeds() + { + using (var muxer = Create(allowAdmin: true, retryPolicy: CommandRetryPolicy.Always)) + { + var conn = muxer.GetDatabase(); + var duration = await conn.PingAsync().ForAwait(); + Log("Ping took: " + duration); + Assert.True(duration.TotalMilliseconds > 0); + } + } + + [Fact] + public async Task TryHandleFailedMessageSucceedsOnEndPointAvailable() + { + GetMock(out var messageRetryHelper, out var messageRetryQueue, out var message); + messageRetryHelper.Setup(failedCommand => failedCommand.IsEndpointAvailable(message)).Returns(true); + + messageRetryQueue.TryHandleFailedCommand(message); + await messageRetryQueue.ProcessRetryQueueAsync(); + Assert.Equal(0, messageRetryQueue.CurrentRetryQueueLength); + + messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Once); + } + + [Fact] + public void TryHandleFailedMessageWaitOnEndPointUnAvailable() + { + GetMock(out var messageRetryHelper, out var messageRetryQueue, out var message); + messageRetryHelper.Setup(mockfailedCommand => mockfailedCommand.IsEndpointAvailable(message)).Returns(false); + + messageRetryQueue.TryHandleFailedCommand(message); + + Assert.Equal(1, messageRetryQueue.CurrentRetryQueueLength); + messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never); + } + + [Fact] + public void TryHandleFailedMessageTimedoutEndPointAvailable() + { + GetMock(out var messageRetryHelper, out var messageRetryQueue, out var message); + + var timeout = new RedisTimeoutException("timedout", CommandStatus.Unknown); + messageRetryHelper.Setup(mockfailedCommand => mockfailedCommand.IsEndpointAvailable(message)).Returns(true); + messageRetryHelper.Setup(failedCommand => failedCommand.HasTimedOut(message)).Returns(true); + messageRetryHelper.Setup(failedCommand => failedCommand.GetTimeoutException(message)).Returns(timeout); + + messageRetryQueue.TryHandleFailedCommand(message); + + messageRetryQueue.CheckRetryQueueForTimeouts(); + Assert.Equal(0, messageRetryQueue.CurrentRetryQueueLength); + + messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never); + messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, timeout), Times.Once); + } + + [Fact] + public void TryHandleFailedMessageGetEndpointThrows() + { + GetMock(out var messageRetryHelper, out var messageRetryQueue, out var message); + var ex = new Exception("failedendpoint"); + messageRetryHelper.Setup(mockfailedCommand => mockfailedCommand.IsEndpointAvailable(message)).Throws(ex); + + messageRetryQueue.TryHandleFailedCommand(message); + Assert.Equal(1, messageRetryQueue.CurrentRetryQueueLength); + + messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never); + messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, ex), Times.Once); + } + + [Fact] + public void TryHandleFailedMessageDrainsQueue() + { + GetMock(out var messageRetryHelper, out var messageRetryQueue, out var message); + messageRetryHelper.Setup(mockfailedCommand => mockfailedCommand.IsEndpointAvailable(message)).Returns(false); + + messageRetryQueue.TryHandleFailedCommand(message); + messageRetryQueue.Dispose(); + + Assert.Equal(0, messageRetryQueue.CurrentRetryQueueLength); + messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never); + messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, It.IsAny()), Times.Once); + } + + [Theory] + [InlineData(true, 0)] + [InlineData(false, 1)] + public void CheckRetryForTimeoutTimesout(bool hasTimedout, int queueLength) + { + GetMock(out var messageRetryHelper, out var messageRetryQueue, out var message); + messageRetryHelper.Setup(mockfailedCommand => mockfailedCommand.IsEndpointAvailable(message)).Returns(false); + messageRetryHelper.Setup(mockfailedCommand => mockfailedCommand.HasTimedOut(message)).Returns(hasTimedout); + + messageRetryQueue.TryHandleFailedCommand(message); + messageRetryQueue.CheckRetryQueueForTimeouts(); + + Assert.Equal(queueLength, messageRetryQueue.CurrentRetryQueueLength); + messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never); + messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, It.IsAny()), Times.Exactly(hasTimedout ? 1 : 0)); + } + + [Fact] + public async void TryHandleFailedMessageTimeoutThrow() + { + GetMock(out var messageRetryHelper, out var messageRetryQueue, out var message); + var ex = new Exception(); + messageRetryHelper.Setup(mockfailedCommand => mockfailedCommand.IsEndpointAvailable(message)).Returns(true); + messageRetryHelper.Setup(mockfailedCommand => mockfailedCommand.HasTimedOut(message)).Throws(ex); + + messageRetryQueue.TryHandleFailedCommand(message); + + await messageRetryQueue.ProcessRetryQueueAsync(); + Assert.Equal(0, messageRetryQueue.CurrentRetryQueueLength); + + messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never); + messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, It.IsAny()), Times.Once); + } + } +} diff --git a/tests/StackExchange.Redis.Tests/Config.cs b/tests/StackExchange.Redis.Tests/Config.cs index 553f3d6f7..09b4b9578 100644 --- a/tests/StackExchange.Redis.Tests/Config.cs +++ b/tests/StackExchange.Redis.Tests/Config.cs @@ -158,6 +158,40 @@ public void CanParseAndFormatUnixDomainSocket() #endif } + [Fact] + public void CanParseCommandRetryPolicy() + { + // By default, we should retry if not sent + var defaultConfig = ConfigurationOptions.Parse("127.0.0.1:6379"); + Assert.Equal(CommandRetryPolicy.Default, defaultConfig.CommandRetryPolicyGenerator); + Assert.Equal(CommandRetryPolicy.IfNotSent, defaultConfig.CommandRetryPolicyGenerator); + Assert.Null(defaultConfig.CommandRetryQueueMaxLength); + + var clonedDefaultConfig = defaultConfig.Clone(); + Assert.Equal(CommandRetryPolicy.Default, clonedDefaultConfig.CommandRetryPolicyGenerator); + + var alwaysConfig = ConfigurationOptions.Parse("127.0.0.1:6379,commandRetryPolicy=Always,commandRetryQueueLength=12000"); + Assert.Equal(CommandRetryPolicy.Always, alwaysConfig.CommandRetryPolicyGenerator); + Assert.Equal(12000, alwaysConfig.CommandRetryQueueMaxLength); + + var alwaysConfigClone = alwaysConfig.Clone(); + Assert.Equal(CommandRetryPolicy.Always, alwaysConfigClone.CommandRetryPolicyGenerator); + Assert.Equal(12000, alwaysConfig.CommandRetryQueueMaxLength); + + var customPolicyOptions = ConfigurationOptions.Parse("127.0.0.1:6379"); + // Good luck deubbing this one, ops team. + Func customPolicyGen = muxer => new DefaultCommandRetryPolicy(muxer, commandStatus => Environment.TickCount % 2 == 0); + customPolicyOptions.CommandRetryPolicyGenerator = customPolicyGen; + Assert.Equal(customPolicyGen, customPolicyOptions.CommandRetryPolicyGenerator); + + // Ensure clones carry correctly + var customClone = customPolicyOptions.Clone(); + Assert.Equal(customPolicyGen, customClone.CommandRetryPolicyGenerator); + + var ex = Assert.Throws(() => ConfigurationOptions.Parse("127.0.0.1:6379,commandRetryPolicy=blah")); + Assert.StartsWith("Keyword 'commandRetryPolicy' can be empty, None, Always or IfNotSent; the value 'blah' is not recognized.", ex.Message); + } + [Fact] public void TalkToNonsenseServer() { diff --git a/tests/StackExchange.Redis.Tests/ConnectionFailedErrors.cs b/tests/StackExchange.Redis.Tests/ConnectionFailedErrors.cs index 8fe8d47e6..4464bb477 100644 --- a/tests/StackExchange.Redis.Tests/ConnectionFailedErrors.cs +++ b/tests/StackExchange.Redis.Tests/ConnectionFailedErrors.cs @@ -105,6 +105,7 @@ void innerScenario() options.Password = ""; options.AbortOnConnectFail = false; options.ConnectTimeout = 1000; + options.CommandRetryPolicyGenerator = CommandRetryPolicy.Never; var outer = Assert.Throws(() => { using (var muxer = ConnectionMultiplexer.Connect(options)) diff --git a/tests/StackExchange.Redis.Tests/SharedConnectionFixture.cs b/tests/StackExchange.Redis.Tests/SharedConnectionFixture.cs index bf22489dd..73b537370 100644 --- a/tests/StackExchange.Redis.Tests/SharedConnectionFixture.cs +++ b/tests/StackExchange.Redis.Tests/SharedConnectionFixture.cs @@ -73,6 +73,8 @@ public bool IgnoreConnect public bool IncludeDetailInExceptions { get => _inner.IncludeDetailInExceptions; set => _inner.IncludeDetailInExceptions = value; } public int StormLogThreshold { get => _inner.StormLogThreshold; set => _inner.StormLogThreshold = value; } + int IInternalConnectionMultiplexer.AsyncTimeoutMilliseconds => throw new NotImplementedException(); + public event EventHandler ErrorMessage { add @@ -288,6 +290,12 @@ public void WaitAll(params Task[] tasks) public void ExportConfiguration(Stream destination, ExportOptions options = ExportOptions.All) => _inner.ExportConfiguration(destination, options); + ServerEndPoint IInternalConnectionMultiplexer.SelectServer(Message message) => _inner.SelectServer(message); + Exception IInternalConnectionMultiplexer.GetException(WriteResult result, Message message, ServerEndPoint server) + => _inner.GetException(result, message, server); + + bool IInternalConnectionMultiplexer.RetryQueueIfEligible(Message message, CommandFailureReason reason, Exception exception) + => _inner.RetryQueueIfEligible(message, reason, exception); } public void Dispose() => _actualConnection.Dispose(); diff --git a/tests/StackExchange.Redis.Tests/TestBase.cs b/tests/StackExchange.Redis.Tests/TestBase.cs index c31a72178..458728334 100644 --- a/tests/StackExchange.Redis.Tests/TestBase.cs +++ b/tests/StackExchange.Redis.Tests/TestBase.cs @@ -230,6 +230,7 @@ internal virtual IInternalConnectionMultiplexer Create( string channelPrefix = null, Proxy? proxy = null, string configuration = null, bool logTransactionData = true, bool shared = true, int? defaultDatabase = null, + Func retryPolicy = null, [CallerMemberName] string caller = null) { if (Output == null) @@ -255,7 +256,7 @@ internal virtual IInternalConnectionMultiplexer Create( checkConnect, failMessage, channelPrefix, proxy, configuration ?? GetConfiguration(), - logTransactionData, defaultDatabase, caller); + logTransactionData, defaultDatabase, retryPolicy, caller); muxer.InternalError += OnInternalError; muxer.ConnectionFailed += OnConnectionFailed; return muxer; @@ -270,7 +271,7 @@ public static ConnectionMultiplexer CreateDefault( string channelPrefix = null, Proxy? proxy = null, string configuration = null, bool logTransactionData = true, int? defaultDatabase = null, - + Func retryPolicy = null, [CallerMemberName] string caller = null) { StringWriter localLog = null; @@ -306,6 +307,7 @@ public static ConnectionMultiplexer CreateDefault( if (connectTimeout != null) config.ConnectTimeout = connectTimeout.Value; if (proxy != null) config.Proxy = proxy.Value; if (defaultDatabase != null) config.DefaultDatabase = defaultDatabase.Value; + if (retryPolicy != null) config.CommandRetryPolicyGenerator = retryPolicy; var watch = Stopwatch.StartNew(); var task = ConnectionMultiplexer.ConnectAsync(config, log); if (!task.Wait(config.ConnectTimeout >= (int.MaxValue / 2) ? int.MaxValue : config.ConnectTimeout * 2))