Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 0 additions & 61 deletions src/StackExchange.Redis/CommandRetry/DefaultRetry.cs

This file was deleted.

22 changes: 0 additions & 22 deletions src/StackExchange.Redis/CommandRetry/ICommandRetry.cs

This file was deleted.

15 changes: 15 additions & 0 deletions src/StackExchange.Redis/CommandRetry/IRetryOnReconnectPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace StackExchange.Redis
{
/// <summary>
/// Interface for a policy that determines which commands should be retried upon restoration of a lost connection
/// </summary>
public interface IRetryOnReconnectPolicy
{
/// <summary>
/// Determines whether a failed command should be retried
/// </summary>
/// <param name="commandStatus">Current state of the command</param>
/// <returns>True to retry the command, otherwise false</returns>
public bool ShouldRetry(CommandStatus commandStatus);
}
}
101 changes: 49 additions & 52 deletions src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,40 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace StackExchange.Redis
{

internal class MessageRetryQueue : IDisposable
{
readonly Queue<Message> queue = new Queue<Message>();
readonly IMessageRetryHelper messageRetryHelper;
int? maxRetryQueueLength;
bool runRetryLoopAsync;
{
private readonly Queue<Message> _queue = new Queue<Message>();
private readonly IMessageRetryHelper _messageRetryHelper;
private readonly int? _maxRetryQueueLength;
private readonly bool _runRetryLoopAsync;

internal MessageRetryQueue(IMessageRetryHelper messageRetryHelper, int? maxRetryQueueLength = null, bool runRetryLoopAsync = true)
{
this.maxRetryQueueLength = maxRetryQueueLength;
this.runRetryLoopAsync = runRetryLoopAsync;
this.messageRetryHelper = messageRetryHelper;
_maxRetryQueueLength = maxRetryQueueLength;
_runRetryLoopAsync = runRetryLoopAsync;
_messageRetryHelper = messageRetryHelper;
}

public int RetryQueueLength => queue.Count;
public int RetryQueueLength => _queue.Count;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool TryHandleFailedCommand(Message message)
{
bool wasEmpty;
lock (queue)
lock (_queue)
{
int count = queue.Count;
if (maxRetryQueueLength.HasValue && count >= maxRetryQueueLength)
int count = _queue.Count;
if (_maxRetryQueueLength.HasValue && count >= _maxRetryQueueLength)
{
return false;
}
wasEmpty = count == 0;
queue.Enqueue(message);
_queue.Enqueue(message);
}
if (wasEmpty) StartRetryQueueProcessor();
return true;
Expand All @@ -46,13 +44,13 @@ internal bool TryHandleFailedCommand(Message message)
internal void StartRetryQueueProcessor()
{
bool startProcessor = false;
lock (queue)
lock (_queue)
{
startProcessor = queue.Count > 0;
startProcessor = _queue.Count > 0;
}
if (startProcessor)
{
if (runRetryLoopAsync)
if (_runRetryLoopAsync)
{
var task = Task.Run(ProcessRetryQueueAsync);
if (task.IsFaulted)
Expand All @@ -67,103 +65,102 @@ internal void StartRetryQueueProcessor()

private async Task ProcessRetryQueueAsync()
{
Message message = null;
while (true)
{
message = null;
Exception failedEndpointex = null;
lock (queue)
Message message = null;
Exception failedEndpointException = null;

lock (_queue)
{
if (queue.Count == 0) break; // all done
message = queue.Peek();
if (_queue.Count == 0) break; // all done
message = _queue.Peek();
try
{
if (!messageRetryHelper.IsEndpointAvailable(message))
if (!_messageRetryHelper.IsEndpointAvailable(message))
{
break;
}
}
catch (Exception ex)
{
failedEndpointex = ex;
failedEndpointException = ex;
}
message = queue.Dequeue();
message = _queue.Dequeue();
}

if (failedEndpointex != null)
if (failedEndpointException != null)
{
messageRetryHelper.SetExceptionAndComplete(message, failedEndpointex);
_messageRetryHelper.SetExceptionAndComplete(message, failedEndpointException);
continue;
}

try
{
if (messageRetryHelper.HasTimedOut(message))
if (_messageRetryHelper.HasTimedOut(message))
{
RedisTimeoutException ex = messageRetryHelper.GetTimeoutException(message);
messageRetryHelper.SetExceptionAndComplete(message,ex);
var ex = _messageRetryHelper.GetTimeoutException(message);
_messageRetryHelper.SetExceptionAndComplete(message, ex);
}
else
{
if (!await messageRetryHelper.TryResendAsync(message))
if (!await _messageRetryHelper.TryResendAsync(message))
{
// this should never happen but just to be safe if connection got dropped again
messageRetryHelper.SetExceptionAndComplete(message);
_messageRetryHelper.SetExceptionAndComplete(message);
}
}
}
catch (Exception ex)
{
messageRetryHelper.SetExceptionAndComplete(message, ex);
_messageRetryHelper.SetExceptionAndComplete(message, ex);
}
}
}



internal void CheckRetryQueueForTimeouts() // check the head of the backlog queue, consuming anything that looks dead
{
lock (queue)
lock (_queue)
{
var now = Environment.TickCount;
while (queue.Count != 0)
while (_queue.Count != 0)
{
var message = queue.Peek();
if (!messageRetryHelper.HasTimedOut(message))
var message = _queue.Peek();
if (!_messageRetryHelper.HasTimedOut(message))
{
break; // not a timeout - we can stop looking
}
queue.Dequeue();
RedisTimeoutException ex = messageRetryHelper.GetTimeoutException(message);
messageRetryHelper.SetExceptionAndComplete(message,ex);
_queue.Dequeue();
RedisTimeoutException ex = _messageRetryHelper.GetTimeoutException(message);
_messageRetryHelper.SetExceptionAndComplete(message, ex);
}
}
}

private void DrainQueue(Exception ex)
{
Message message;
lock (queue)
lock (_queue)
{
while (queue.Count != 0)
while (_queue.Count != 0)
{
message = queue.Dequeue();
messageRetryHelper.SetExceptionAndComplete(message, ex);
message = _queue.Dequeue();
_messageRetryHelper.SetExceptionAndComplete(message, ex);
}
}
}

private bool disposedValue = false;
private bool _disposedValue = false;

protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
if (!_disposedValue)
{
_disposedValue = true;

if (disposing)
{
DrainQueue(new Exception("RetryQueue disposed"));
DrainQueue(new Exception($"{nameof(MessageRetryQueue)} disposed"));
}
disposedValue = true;
}
}

Expand Down
39 changes: 39 additions & 0 deletions src/StackExchange.Redis/CommandRetry/RetryOnReconnect.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;

namespace StackExchange.Redis
{
/// <summary>
/// Command retry policy to determine which commands will be retried after a lost connection is retored
/// </summary>
public class RetryOnReconnect : IRetryOnReconnectPolicy
{
private readonly Func<CommandStatus, bool> _shouldRetry;

internal RetryOnReconnect(Func<CommandStatus, bool> shouldRetry)
{
_shouldRetry = shouldRetry;
}

/// <summary>
/// Retry all commands
/// </summary>
/// <returns>An instance of a retry policy that retries all commands</returns>
public static IRetryOnReconnectPolicy Always
=> new RetryOnReconnect(commandStatus => true);

/// <summary>
/// Retry only commands which fail before being sent to the server
/// </summary>
/// <returns>An instance of a policy that retries only unsent commands</returns>
public static IRetryOnReconnectPolicy IfNotSent
=> new RetryOnReconnect(commandStatus => commandStatus == CommandStatus.WaitingToBeSent);

/// <summary>
/// Determines whether to retry a command upon restoration of a lost connection
/// </summary>
/// <param name="commandStatus">Status of the command</param>
/// <returns>True to retry the command, otherwise false</returns>
public bool ShouldRetry(CommandStatus commandStatus)
=> _shouldRetry.Invoke(commandStatus);
}
}
Loading