-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Auto retry message on connection failure #1856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
83 commits
Select commit
Hold shift + click to select a range
ad90509
autoretry with hooks
deepakverma 4e9e97b
RequestMessage Implementation
deepakverma 433da7f
Add hook to requestFailed on connection exception
deepakverma 8a533f6
experimenting with the main retry loop to be handling time outs as well
deepakverma ae9e35a
Making requestfailed as internal for now, will expose it later after …
deepakverma 754ba69
improve message timeout while waiting for connection restore to complete
deepakverma 9ff40ef
Remove unused variable
deepakverma fd1f1da
Removing unused commandflags for now
deepakverma 415bcdf
undo csproj change to match upstream
deepakverma 889520c
Creating a connectiontry configurationoption and command flags
deepakverma 2ed46cd
connection,ultipexer changes to handle message retry
deepakverma 97d96cc
Message retry
deepakverma 1f7c27c
ensure no retry is set as default
deepakverma bb1a1d1
whoops, cleaning up stale code and rety should be default as null
deepakverma 9e815d3
typo
deepakverma 78e6910
Merge branch 'main' into autoretry
deepakverma c7c6b8e
Merge branch 'main' into pr/1755
NickCraver d6b4d30
making Retryqueuelength nullable
deepakverma 623d731
dropping onconnectionrestore from retry config option
deepakverma 7cd57b8
renaming Retry => CommandRetry
deepakverma ed1be50
to honor redirent option specified by the user, do not reset noredirect
deepakverma 906289f
incorporating feedback on PR
deepakverma fd3407a
do not retry admin commands
deepakverma ea5091a
unifiying commandretry and commandlfags
deepakverma 97c957a
fixed failing test
deepakverma cc9f6da
IRetry
deepakverma f4f994f
introducing FailedMessage class to make IRetry shouldretry extensible
deepakverma 127c25c
separating out retrymanager
deepakverma 5968359
first attempt to decouple iretrymanager from mux
deepakverma e9f0c0f
moving out timeout check to retrymanager
deepakverma c2cb2e4
separating out Retrymanager and being explicit on policy depends on q…
deepakverma 50802fd
reverting the rename
deepakverma df3e08b
rnaming methods
deepakverma 819eb0b
improving failedmessage
deepakverma c5a89aa
Make messageretrymanager internal for now
deepakverma 4cb0175
making MessageRetryQueueManager as internal for now
deepakverma fa12e0b
1. retryqueuelength
deepakverma 14d54f8
few renames
deepakverma 20f7bf4
cleaning up failedmessage
deepakverma e93d836
starting unit test
deepakverma 2524fc7
using IInternalConnectionmultiplexer
deepakverma 5e5edb9
Handle exception
deepakverma 6631146
handleResult implementation
deepakverma e02e04b
handle timeout on sync for retry
deepakverma 2a9d182
can handle redisexception only and first unit test
deepakverma 6018207
making commandretryqueuemanager testable
deepakverma c5e2080
CommandRetryQueueManager tests
deepakverma a941dbc
mux should retry internal commands but not call shouldretry
deepakverma 9e3d515
test to cover exception being thrown while processing the message
deepakverma 3523397
integration test
deepakverma 83c78bf
integrationTest
deepakverma 3afa54f
command override tests
deepakverma c326927
Simplifying RetryPolicy
deepakverma 87e1fb1
DefaultREtryPolicy
deepakverma b615eec
refactored out failedcommand
deepakverma a067467
updated summary and few renames
deepakverma 50bd6df
code cleanup
deepakverma febffca
this should likely fix the incorrect diff with main
deepakverma bfdfb3b
fixing another diff that should have not been there
deepakverma 83bbd87
no op where it doesn't retry timeouts
deepakverma bceb859
merge
deepakverma 41cc755
Merge branch 'main' of https://github.com/StackExchange/StackExchange…
deepakverma 335bef6
Expose retry policy config as RetryCommandsOnReconnect
philon-msft 06bb176
Minor unit test improvements
philon-msft e0da9c4
Minor syntax cleanup
philon-msft 2d3b860
Merge pull request #2 from philon-msft/master
deepakverma 0a165bd
Merge branch 'autoretry' into autoretry2
deepakverma d29a7fc
moving methods from mux to retrypolicy
deepakverma 7ef0a5b
fix conditions in IsMessageREtriable and make it simpler for reading
deepakverma dc6cfa1
Merge pull request #4 from deepakverma/autoretry2
deepakverma a0c26de
cleaning up debug code
deepakverma 0ab8bf7
Throw in case retrypolicy is not set and pass failing test
deepakverma f68655d
Initial refactor/renaming
c857c4b
Fix ref issue
8189e7a
Move love
570b060
Cleanup
c48ff6d
Cleanup internals
NickCraver 8d9d0fe
Simplify
NickCraver 664ede2
Merge branch 'main' into command-retry
NickCraver 3cc22ea
Revert "Cleanup internals"
NickCraver 62d215a
Add note on why this internal visibility is needed
NickCraver f4a3a54
Merge remote-tracking branch 'origin/main' into command-retry
NickCraver da3e55d
Command Retry: Changes and Simplifications (#1857)
NickCraver File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
src/StackExchange.Redis/CommandRetry/CommandFailureReason.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| namespace StackExchange.Redis | ||
| { | ||
| /// <summary> | ||
| /// The reason a command failed to send or complete. | ||
| /// </summary> | ||
| public enum CommandFailureReason | ||
| { | ||
| /// <summary> | ||
| /// No open/valid connection was avaialble to send on - we couldn't even write the command. | ||
| /// </summary> | ||
| WriteFailure, | ||
| /// <summary> | ||
| /// The message was sent, but we lost the connection and this command in-flight. | ||
| /// </summary> | ||
| ConnectionFailure, | ||
| /// <summary> | ||
| /// Command has timed out, exceeding the sync or async timeout limits | ||
| /// </summary> | ||
| Timeout, | ||
| /// <summary> | ||
| /// This command failed again, during a retry | ||
| /// </summary> | ||
| RetryFailure, | ||
| } | ||
| } |
99 changes: 99 additions & 0 deletions
99
src/StackExchange.Redis/CommandRetry/CommandRetryPolicy.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| using System; | ||
|
|
||
| namespace StackExchange.Redis | ||
| { | ||
| /// <summary> | ||
| /// Policy that determines which commands should be retried upon restoration of a lost connection. | ||
| /// </summary> | ||
| public abstract class CommandRetryPolicy | ||
| { | ||
| /// <summary> | ||
| /// Creates a policy instance for a specific multiplexer and its commands. | ||
| /// </summary> | ||
| /// <param name="muxer">The muleiplexer this policy is for.</param> | ||
| protected CommandRetryPolicy(ConnectionMultiplexer muxer) { } | ||
|
|
||
| /// <summary> | ||
| /// Returns the current length of the retry queue. | ||
| /// </summary> | ||
| public abstract int CurrentQueueLength { get; } | ||
|
|
||
| /// <summary> | ||
| /// Returns whether the current queue is processing (e.g. retrying queued commands). | ||
| /// </summary> | ||
| public abstract bool CurrentlyProcessing { get; } | ||
|
|
||
| /// <summary> | ||
| /// Returns the status of the retry mechanism, e.g. what the queue is doing. | ||
| /// </summary> | ||
| public abstract string StatusDescription { get; } | ||
|
|
||
| /// <summary> | ||
| /// Determines if a message is eligible for retrying at all. | ||
| /// </summary> | ||
| /// <param name="message">The message to check eligibility for.</param> | ||
| /// <returns>True if a message is eligible.</returns> | ||
| internal static bool IsEligible(Message message) | ||
| { | ||
| if ((message.Flags & CommandFlags.NoRetry) != 0 | ||
| || ((message.Flags & CommandFlags.RetryIfNotSent) != 0 && message.Status == CommandStatus.Sent) | ||
| || message.IsAdmin | ||
| || message.IsInternalCall) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Determines if an xception is eligible for retrying at all. | ||
| /// </summary> | ||
| /// <param name="exception">The exception to check eligibility for.</param> | ||
| /// <returns>True if an exception is eligible.</returns> | ||
| internal static bool IsEligible(Exception exception) => exception is RedisException; | ||
|
|
||
| /// <summary> | ||
| /// Tries to queue an eligible command. | ||
| /// Protected because this isn't called directly - eligibility (above) is checked first by the multiplexer. | ||
| /// </summary> | ||
| protected internal abstract bool TryQueue(FailedCommand command); | ||
|
|
||
| /// <summary> | ||
| /// Called when a heartbeat occurs. | ||
| /// </summary> | ||
| public abstract void OnHeartbeat(); | ||
|
|
||
| /// <summary> | ||
| /// Called when a multiplexer reconnects. | ||
| /// </summary> | ||
| public abstract void OnReconnect(); | ||
|
|
||
| /// <summary> | ||
| /// Default policy - retry only commands which fail before being sent to the server (alias for <see cref="IfNotSent"/>). | ||
| /// </summary> | ||
| /// <returns>An instance of a policy that retries only unsent commands.</returns> | ||
| public static Func<ConnectionMultiplexer, CommandRetryPolicy> Default => IfNotSent; | ||
|
|
||
| /// <summary> | ||
| /// Retry all commands. | ||
| /// </summary> | ||
| /// <returns>An instance of a retry policy that retries all commands.</returns> | ||
| public static Func<ConnectionMultiplexer, CommandRetryPolicy> Always | ||
| => mutex => new DefaultCommandRetryPolicy(mutex, commandStatus => true); | ||
|
|
||
| /// <summary> | ||
| /// Retry only commands which fail before being sent to the server. | ||
| /// </summary> | ||
| /// <returns>An instance of a policy that retries only unsent commands.</returns> | ||
| public static Func<ConnectionMultiplexer, CommandRetryPolicy> IfNotSent | ||
| => mutex => new DefaultCommandRetryPolicy(mutex, command => command.Status == CommandStatus.WaitingToBeSent); | ||
|
|
||
| /// <summary> | ||
| /// Never retry a command. | ||
| /// </summary> | ||
| /// <returns>An instance of a retry policy that retries no commands.</returns> | ||
| public static Func<ConnectionMultiplexer, CommandRetryPolicy> Never | ||
| => mutex => new NeverCommandRetryPolicy(mutex); | ||
| } | ||
| } | ||
81 changes: 81 additions & 0 deletions
81
src/StackExchange.Redis/CommandRetry/DefaultCommandRetryPolicy.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| using System; | ||
|
|
||
| namespace StackExchange.Redis | ||
| { | ||
| /// <summary> | ||
| /// Command retry policy to determine which commands will be retried after a lost connection is retored | ||
| /// </summary> | ||
| public class DefaultCommandRetryPolicy : CommandRetryPolicy | ||
| { | ||
| private MessageRetryQueue RetryQueue { get; } | ||
|
|
||
| private readonly Func<FailedCommand, bool> _shouldRetry; | ||
|
|
||
| /// <summary> | ||
| /// Creates a <see cref="DefaultCommandRetryPolicy"/> for the given <see cref="ConnectionMultiplexer"/>. | ||
| /// </summary> | ||
| /// <param name="muxer">The <see cref="ConnectionMultiplexer"/> to handle retries for.</param> | ||
| /// <param name="shouldRetry">Whether a command should be retried.</param> | ||
| protected internal DefaultCommandRetryPolicy(ConnectionMultiplexer muxer, Func<FailedCommand, bool> shouldRetry) : base(muxer) | ||
| { | ||
| _shouldRetry = shouldRetry; | ||
| var messageRetryHelper = new MessageRetryHelper(muxer); | ||
| RetryQueue = new MessageRetryQueue(messageRetryHelper); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the current length of the retry queue. | ||
| /// </summary> | ||
| public override int CurrentQueueLength => RetryQueue.CurrentRetryQueueLength; | ||
|
|
||
| /// <summary> | ||
| /// Returns whether the current queue is processing (e.g. retrying queued commands). | ||
| /// </summary> | ||
| public override bool CurrentlyProcessing => RetryQueue.IsRunning; | ||
|
|
||
| /// <summary> | ||
| /// Returns whether the current queue is processing (e.g. retrying queued commands). | ||
| /// </summary> | ||
| public override string StatusDescription => RetryQueue.StatusDescription; | ||
|
|
||
| /// <summary> | ||
| /// Tries to queue a message for retry if possible. | ||
| /// </summary> | ||
| /// <param name="command">The command to tru queueing (contains the message and exception).</param> | ||
| /// <returns>True if the message was queued.</returns> | ||
| /// <remarks>Note that this is internal only - external callers cannot override it to bypass the CanRetry checks.</remarks> | ||
| protected internal override bool TryQueue(FailedCommand command) | ||
| { | ||
| // Sanity check if we should be trying this one | ||
| if (!_shouldRetry.Invoke(command)) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| if (RetryQueue.TryHandleFailedCommand(command.Message)) | ||
| { | ||
| // if this message is a new message set the writetime | ||
| if (command.Message.GetWriteTime() == 0) | ||
| { | ||
| command.Message.SetEnqueued(null); | ||
| } | ||
|
|
||
| command.Message.ResetStatusToWaitingToBeSent(); | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Called on heartbeat, evaluating if anything in queue has timed out and need pruning. | ||
| /// </summary> | ||
| public override void OnHeartbeat() => RetryQueue.CheckRetryQueueForTimeouts(); | ||
|
|
||
| /// <summary> | ||
| /// Called on a multiplexer reconnect, to start sending anything in the queue. | ||
| /// </summary> | ||
| public override void OnReconnect() => RetryQueue.StartRetryQueueProcessor(); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| using System; | ||
|
|
||
| namespace StackExchange.Redis | ||
| { | ||
| /// <summary> | ||
| /// Command retry policy to act as a no-op for all commands. | ||
| /// </summary> | ||
| public sealed class FailedCommand | ||
| { | ||
| /// <summary> | ||
| /// The original/inner message that failed. | ||
| /// </summary> | ||
| internal Message Message; | ||
|
|
||
| /// <summary> | ||
| /// Status of the command. | ||
| /// </summary> | ||
| public CommandStatus Status => Message.Status; | ||
|
|
||
| /// <summary> | ||
| /// The redis command sent. | ||
| /// </summary> | ||
| public string CommandAndKey => Message.CommandAndKey; | ||
|
|
||
| /// <summary> | ||
| /// The reason this command failed, e.g. no connection, timeout, etc. | ||
| /// </summary> | ||
| public CommandFailureReason FailureReason { get; } | ||
|
|
||
| /// <summary> | ||
| /// The exception that happened to create this failed command. | ||
| /// </summary> | ||
| public Exception Exception { get; } | ||
|
|
||
| internal static FailedCommand FromWriteFail(Message message, Exception exception) => | ||
| new FailedCommand(message, CommandFailureReason.WriteFailure, exception); | ||
|
|
||
| internal FailedCommand(Message message, CommandFailureReason reason, Exception exception) | ||
| { | ||
| Message = message; | ||
| FailureReason = reason; | ||
| Exception = exception; | ||
| } | ||
| } | ||
| } |
14 changes: 14 additions & 0 deletions
14
src/StackExchange.Redis/CommandRetry/IMessageRetryHelper.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| using System; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace StackExchange.Redis | ||
| { | ||
| internal interface IMessageRetryHelper | ||
| { | ||
| RedisTimeoutException GetTimeoutException(Message message); | ||
| bool HasTimedOut(Message message); | ||
| bool IsEndpointAvailable(Message message); | ||
| void SetExceptionAndComplete(Message message, Exception ex = null); | ||
| Task<bool> TryResendAsync(Message message); | ||
| } | ||
| } |
71 changes: 71 additions & 0 deletions
71
src/StackExchange.Redis/CommandRetry/MessageRetryHelper.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| using System; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace StackExchange.Redis | ||
| { | ||
| internal class MessageRetryHelper : IMessageRetryHelper | ||
| { | ||
| private readonly IInternalConnectionMultiplexer multiplexer; | ||
|
|
||
| public MessageRetryHelper(IInternalConnectionMultiplexer multiplexer) | ||
| { | ||
| this.multiplexer = multiplexer; | ||
| } | ||
|
|
||
| public bool HasTimedOut(Message message) | ||
| { | ||
| var timeoutMilliseconds = message.ResultBoxIsAsync ? multiplexer.AsyncTimeoutMilliseconds : multiplexer.TimeoutMilliseconds; | ||
| int millisecondsTaken = unchecked(Environment.TickCount - message.GetWriteTime()); | ||
| return millisecondsTaken >= timeoutMilliseconds; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the timeout exception for a message. | ||
| /// </summary> | ||
| /// <param name="message">The messae to get a message for</param> | ||
| /// <returns></returns> | ||
| /// <remarks> | ||
| /// Not using ExceptionFactory.Timeout as it can cause deadlock while trying to lock writtenawaiting response queue for GetHeadMessages. | ||
| /// </remarks> | ||
| public RedisTimeoutException GetTimeoutException(Message message) | ||
| { | ||
| var sb = new StringBuilder(); | ||
| sb.Append("Timeout while waiting for connectionrestore ").Append(message.Command).Append(" (").Append(Format.ToString(multiplexer.TimeoutMilliseconds)).Append("ms)"); | ||
| var ex = new RedisTimeoutException(sb.ToString(), message.Status); | ||
| return ex; | ||
| } | ||
|
|
||
| public bool IsEndpointAvailable(Message message) => multiplexer.SelectServer(message) != null; | ||
|
|
||
| /// <summary> | ||
| /// Tries to re-issue a <see cref="Message"/>. | ||
| /// </summary> | ||
| /// <param name="message">The message to re-send.</param> | ||
| /// <returns>Whether the write was successful.</returns> | ||
| public async Task<bool> TryResendAsync(Message message) | ||
| { | ||
| // Use a specific server if one was specified originally, otherwise auto-select | ||
| // This is important for things like REPLICAOF we really don't want going to another location | ||
| var server = message.SpecificServer ?? multiplexer.SelectServer(message); | ||
| if (server != null) | ||
| { | ||
| var result = await server.TryWriteAsync(message).ForAwait(); | ||
|
|
||
| if (result != WriteResult.Success) | ||
| { | ||
| var ex = multiplexer.GetException(result, message, server); | ||
| SetExceptionAndComplete(message, ex); | ||
| } | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| public void SetExceptionAndComplete(Message message, Exception ex = null) | ||
| { | ||
| var inner = new RedisConnectionException(ConnectionFailureType.UnableToConnect, "Failed while retrying on connection restore: " + ex.Message, ex); | ||
| message.SetExceptionAndComplete(inner, null, CommandFailureReason.RetryFailure); | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For exception messages, probably want to
CurrentlyProcessing(bool) for exception messages