Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/StackExchange.Redis/CommandRetry/CommandFailureReason.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace StackExchange.Redis
{
/// <summary>
/// The reason a command failed to send or complete.
/// </summary>
public enum CommandFailureReason
{
/// <summary>
/// No open/valid connection was avaialble to send on - we couldn't even write the command.
/// </summary>
WriteFailure,
/// <summary>
/// The message was sent, but we lost the connection and this command in-flight.
/// </summary>
ConnectionFailure,
/// <summary>
/// Command has timed out, exceeding the sync or async timeout limits
/// </summary>
Timeout,
/// <summary>
/// This command failed again, during a retry
/// </summary>
RetryFailure,
}
}
63 changes: 53 additions & 10 deletions src/StackExchange.Redis/CommandRetry/CommandRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,53 @@ public abstract class CommandRetryPolicy
/// Creates a policy instance for a specific multiplexer and its commands.
/// </summary>
/// <param name="muxer">The muleiplexer this policy is for.</param>
protected CommandRetryPolicy(ConnectionMultiplexer muxer)
{
}

internal abstract bool TryQueue(Message message, Exception ex);
protected CommandRetryPolicy(ConnectionMultiplexer muxer) { }

/// <summary>
/// Returns the current length of the retry queue.
/// </summary>
public abstract int CurrentQueueLength { get; }

/// <summary>
/// Determines whether a failed command should be retried.
/// Returns whether the current queue is processing (e.g. retrying queued commands).
/// </summary>
/// <param name="commandStatus">Current state of the command.</param>
/// <returns>True to retry the command, otherwise false.</returns>
public abstract bool ShouldRetry(CommandStatus commandStatus);
public abstract bool CurrentlyProcessing { get; }

/// <summary>
/// Returns the status of the retry mechanism, e.g. what the queue is doing.
/// </summary>
public abstract string StatusDescription { get; }

/// <summary>
/// Determines if a message is eligible for retrying at all.
/// </summary>
/// <param name="message">The message to check eligibility for.</param>
/// <returns>True if a message is eligible.</returns>
internal static bool IsEligible(Message message)
{
if ((message.Flags & CommandFlags.NoRetry) != 0
|| ((message.Flags & CommandFlags.RetryIfNotSent) != 0 && message.Status == CommandStatus.Sent)
|| message.IsAdmin
|| message.IsInternalCall)
{
return false;
}

return true;
}

/// <summary>
/// Determines if an xception is eligible for retrying at all.
/// </summary>
/// <param name="exception">The exception to check eligibility for.</param>
/// <returns>True if an exception is eligible.</returns>
internal static bool IsEligible(Exception exception) => exception is RedisException;

/// <summary>
/// Tries to queue an eligible command.
/// Protected because this isn't called directly - eligibility (above) is checked first by the multiplexer.
/// </summary>
protected internal abstract bool TryQueue(FailedCommand command);

/// <summary>
/// Called when a heartbeat occurs.
Expand All @@ -39,6 +69,12 @@ protected CommandRetryPolicy(ConnectionMultiplexer muxer)
/// </summary>
public abstract void OnReconnect();

/// <summary>
/// Default policy - retry only commands which fail before being sent to the server (alias for <see cref="IfNotSent"/>).
/// </summary>
/// <returns>An instance of a policy that retries only unsent commands.</returns>
public static Func<ConnectionMultiplexer, CommandRetryPolicy> Default => IfNotSent;

/// <summary>
/// Retry all commands.
/// </summary>
Expand All @@ -51,6 +87,13 @@ public static Func<ConnectionMultiplexer, CommandRetryPolicy> Always
/// </summary>
/// <returns>An instance of a policy that retries only unsent commands.</returns>
public static Func<ConnectionMultiplexer, CommandRetryPolicy> IfNotSent
=> mutex => new DefaultCommandRetryPolicy(mutex, commandStatus => commandStatus == CommandStatus.WaitingToBeSent);
=> mutex => new DefaultCommandRetryPolicy(mutex, command => command.Status == CommandStatus.WaitingToBeSent);

/// <summary>
/// Never retry a command.
/// </summary>
/// <returns>An instance of a retry policy that retries no commands.</returns>
public static Func<ConnectionMultiplexer, CommandRetryPolicy> Never
=> mutex => new NeverCommandRetryPolicy(mutex);
}
}
62 changes: 22 additions & 40 deletions src/StackExchange.Redis/CommandRetry/DefaultCommandRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,83 +9,65 @@ public class DefaultCommandRetryPolicy : CommandRetryPolicy
{
private MessageRetryQueue RetryQueue { get; }

private readonly Func<CommandStatus, bool> _shouldRetry;
private readonly Func<FailedCommand, bool> _shouldRetry;

/// <summary>
/// Creates a <see cref="DefaultCommandRetryPolicy"/> for the given <see cref="ConnectionMultiplexer"/>.
/// </summary>
/// <param name="muxer">The <see cref="ConnectionMultiplexer"/> to handle retries for.</param>
/// <param name="shouldRetry">Whether a command should be retried.</param>
internal DefaultCommandRetryPolicy(ConnectionMultiplexer muxer, Func<CommandStatus, bool> shouldRetry) : base(muxer)
protected internal DefaultCommandRetryPolicy(ConnectionMultiplexer muxer, Func<FailedCommand, bool> shouldRetry) : base(muxer)
{
_shouldRetry = shouldRetry;
var messageRetryHelper = new MessageRetryHelper(muxer);
RetryQueue = new MessageRetryQueue(messageRetryHelper);
}

/// <summary>
/// Gets whether this message is eligible for retrying according to this policy.
/// Gets the current length of the retry queue.
/// </summary>
/// <param name="message">The message to retry.</param>
/// <param name="ex">The exception from the initial send.</param>
/// <returns>Whether the given message/exception combination is eligible for retry.</returns>
internal bool CanRetry(Message message, Exception ex)
{
if ((message.Flags & CommandFlags.NoRetry) != 0
|| ((message.Flags & CommandFlags.RetryIfNotSent) != 0 && message.Status == CommandStatus.Sent)
|| message.IsAdmin
|| message.IsInternalCall
|| !(ex is RedisException)
|| !ShouldRetry(message.Status))
{
return false;
}
public override int CurrentQueueLength => RetryQueue.CurrentRetryQueueLength;

return true;
}
/// <summary>
/// Returns whether the current queue is processing (e.g. retrying queued commands).
/// </summary>
public override bool CurrentlyProcessing => RetryQueue.IsRunning;

/// <summary>
/// Returns whether the current queue is processing (e.g. retrying queued commands).
/// </summary>
public override string StatusDescription => RetryQueue.StatusDescription;

/// <summary>
/// Tries to queue a message for retry if possible.
/// </summary>
/// <param name="message">The message to attempt to retry.</param>
/// <param name="ex">The exception, what happened when this message was originally tried.</param>
/// <param name="command">The command to tru queueing (contains the message and exception).</param>
/// <returns>True if the message was queued.</returns>
internal override bool TryQueue(Message message, Exception ex)
/// <remarks>Note that this is internal only - external callers cannot override it to bypass the CanRetry checks.</remarks>
protected internal override bool TryQueue(FailedCommand command)
{
if (!CanRetry(message, ex))
// Sanity check if we should be trying this one
if (!_shouldRetry.Invoke(command))
{
return false;
}

if (RetryQueue.TryHandleFailedCommand(message))
if (RetryQueue.TryHandleFailedCommand(command.Message))
{
// if this message is a new message set the writetime
if (message.GetWriteTime() == 0)
if (command.Message.GetWriteTime() == 0)
{
message.SetEnqueued(null);
command.Message.SetEnqueued(null);
}

message.ResetStatusToWaitingToBeSent();
command.Message.ResetStatusToWaitingToBeSent();

return true;
}

return false;
}

/// <summary>
/// Gets the current length of the retry queue.
/// </summary>
public override int CurrentQueueLength => RetryQueue.CurrentRetryQueueLength;

/// <summary>
/// Determines whether to retry a command upon restoration of a lost connection.
/// </summary>
/// <param name="commandStatus">Status of the command.</param>
/// <returns>True to retry the command, otherwise false.</returns>
public override bool ShouldRetry(CommandStatus commandStatus)
=> _shouldRetry.Invoke(commandStatus);

/// <summary>
/// Called on heartbeat, evaluating if anything in queue has timed out and need pruning.
/// </summary>
Expand Down
45 changes: 45 additions & 0 deletions src/StackExchange.Redis/CommandRetry/FailedCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;

namespace StackExchange.Redis
{
/// <summary>
/// Command retry policy to act as a no-op for all commands.
/// </summary>
public sealed class FailedCommand
{
/// <summary>
/// The original/inner message that failed.
/// </summary>
internal Message Message;

/// <summary>
/// Status of the command.
/// </summary>
public CommandStatus Status => Message.Status;

/// <summary>
/// The redis command sent.
/// </summary>
public string CommandAndKey => Message.CommandAndKey;

/// <summary>
/// The reason this command failed, e.g. no connection, timeout, etc.
/// </summary>
public CommandFailureReason FailureReason { get; }

/// <summary>
/// The exception that happened to create this failed command.
/// </summary>
public Exception Exception { get; }

internal static FailedCommand FromWriteFail(Message message, Exception exception) =>
new FailedCommand(message, CommandFailureReason.WriteFailure, exception);

internal FailedCommand(Message message, CommandFailureReason reason, Exception exception)
{
Message = message;
FailureReason = reason;
Exception = exception;
}
}
}
8 changes: 5 additions & 3 deletions src/StackExchange.Redis/CommandRetry/MessageRetryHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public RedisTimeoutException GetTimeoutException(Message message)
/// <returns>Whether the write was successful.</returns>
public async Task<bool> TryResendAsync(Message message)
{
var server = multiplexer.SelectServer(message);
// Use a specific server if one was specified originally, otherwise auto-select
// This is important for things like REPLICAOF we really don't want going to another location
var server = message.SpecificServer ?? multiplexer.SelectServer(message);
if (server != null)
{
var result = await server.TryWriteAsync(message).ForAwait();
Expand All @@ -62,8 +64,8 @@ public async Task<bool> TryResendAsync(Message message)

public void SetExceptionAndComplete(Message message, Exception ex = null)
{
var inner = new RedisConnectionException(ConnectionFailureType.UnableToConnect, "Failed while retrying on connection restore", ex);
message.SetExceptionAndComplete(inner, null, onConnectionRestoreRetry: false);
var inner = new RedisConnectionException(ConnectionFailureType.UnableToConnect, "Failed while retrying on connection restore: " + ex.Message, ex);
message.SetExceptionAndComplete(inner, null, CommandFailureReason.RetryFailure);
}
}
}
Loading