Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 16 additions & 57 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2303,53 +2303,16 @@ internal void OnManagedConnectionFailed(object sender, ConnectionFailedEventArgs
{
SwitchMaster(e.EndPoint, connection);
}, null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1));

//connection.sentinelMasterReconnectTimer.AutoReset = true;

//connection.sentinelMasterReconnectTimer.Start();
}
}

internal EndPoint GetConfiguredMasterForService(string serviceName, int timeoutmillis = -1)
{
Task<EndPoint>[] sentinelMasters = GetServerSnapshot().ToArray()
.Where(s => s.ServerType == ServerType.Sentinel)
.Select(s => GetServer(s.EndPoint).SentinelGetMasterAddressByNameAsync(serviceName))
.ToArray();

Task<Task<EndPoint>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinelMasters);
if (!firstCompleteRequest.Wait(timeoutmillis))
throw new TimeoutException("Timeout resolving master for service");
if (firstCompleteRequest.Result?.Result == null)
throw new Exception("Unable to determine master");

return firstCompleteRequest.Result.Result;
}

private static async Task<Task<T>> WaitFirstNonNullIgnoreErrorsAsync<T>(Task<T>[] tasks)
{
if (tasks == null) throw new ArgumentNullException("tasks");
if (tasks.Length == 0) return null;
var typeNullable = (Nullable.GetUnderlyingType(typeof(T)) != null);
var taskList = tasks.Cast<Task>().ToList();

try
{
while (taskList.Count > 0)
{
var allTasksAwaitingAny = Task.WhenAny(taskList).ObserveErrors();
var result = await allTasksAwaitingAny.ForAwait();
taskList.Remove((Task<T>)result);
if (((Task<T>)result).IsFaulted) continue;
if ((!typeNullable) || ((Task<T>)result).Result != null)
return (Task<T>)result;
}
}
catch
{ }

return null;
}
internal EndPoint GetConfiguredMasterForService(string serviceName) =>
GetServerSnapshot()
.ToArray()
.Where(s => s.ServerType == ServerType.Sentinel)
.AsParallel()
.Select(s => GetServer(s.EndPoint).SentinelGetMasterAddressByName(serviceName))
.First(r => r != null);

internal EndPoint currentSentinelMasterEndPoint;

Expand Down Expand Up @@ -2419,26 +2382,22 @@ private T Retry<T>(int times, int interval, Func<T> func, string message)
throw new NullReferenceException(message);
}

internal void UpdateSentinelAddressList(string serviceName, int timeoutmillis = 500)
internal void UpdateSentinelAddressList(string serviceName)
{
Task<EndPoint[]>[] sentinels = GetServerSnapshot().ToArray()
.Where(s => s.ServerType == ServerType.Sentinel)
.Select(s => GetServer(s.EndPoint).SentinelGetSentinelAddresses(serviceName))
.ToArray();

Task<Task<EndPoint[]>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinels);
var firstCompleteRequest = GetServerSnapshot()
.ToArray()
.Where(s => s.ServerType == ServerType.Sentinel)
.AsParallel()
.Select(s => GetServer(s.EndPoint).SentinelGetSentinelAddresses(serviceName))
.First(r => r != null);

// Ignore errors, as having an updated sentinel list is
// not essential
if (firstCompleteRequest.Result?.Result == null)
return;
if (!firstCompleteRequest.Wait(timeoutmillis))
return;
if (firstCompleteRequest.Result.Result == null)
if (firstCompleteRequest == null)
return;

bool hasNew = false;
foreach (EndPoint newSentinel in firstCompleteRequest.Result.Result.Where(x => !RawConfig.EndPoints.Contains(x)))
foreach (EndPoint newSentinel in firstCompleteRequest.Where(x => !RawConfig.EndPoints.Contains(x)))
{
hasNew = true;
RawConfig.EndPoints.Add(newSentinel);
Expand Down
13 changes: 11 additions & 2 deletions src/StackExchange.Redis/Interfaces/IServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -756,10 +756,19 @@ public partial interface IServer : IRedis
/// Returns the ip and port numbers of all known Sentinels
/// for the given service name.
/// </summary>
/// <param name="serviveName">the sentinel service name</param>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>a list of the sentinel ips and ports</returns>
Task<EndPoint[]> SentinelGetSentinelAddresses(string serviveName, CommandFlags flags = CommandFlags.None);
EndPoint[] SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Returns the ip and port numbers of all known Sentinels
/// for the given service name.
/// </summary>
/// <param name="serviceName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>a list of the sentinel ips and ports</returns>
Task<EndPoint[]> SentinelGetSentinelAddressesAsync(string serviceName, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Show the state and info of the specified master.
Expand Down
10 changes: 8 additions & 2 deletions src/StackExchange.Redis/RedisServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -807,10 +807,16 @@ public Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, Co
return ExecuteAsync(msg, ResultProcessor.SentinelMasterEndpoint);
}

public Task<EndPoint[]> SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None)
public EndPoint[] SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SENTINELS, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.SentinelAddressesEndPoints);
return ExecuteSync(msg, ResultProcessor.SentinelAddressesEndPoints);
}

public Task<EndPoint[]> SentinelGetSentinelAddressesAsync(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SENTINELS, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.SentinelAddressesEndPoints);
}

public KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None)
Expand Down
6 changes: 3 additions & 3 deletions tests/StackExchange.Redis.Tests/Sentinel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,15 @@ await UntilCondition(TimeSpan.FromSeconds(20), () =>
[Fact]
public async Task SentinelGetSentinelAddressesTest()
{
var addresses = await SentinelServerA.SentinelGetSentinelAddresses(ServiceName).ForAwait();
var addresses = await SentinelServerA.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait();
Assert.Contains(SentinelServerB.EndPoint, addresses);
Assert.Contains(SentinelServerC.EndPoint, addresses);

addresses = await SentinelServerB.SentinelGetSentinelAddresses(ServiceName).ForAwait();
addresses = await SentinelServerB.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait();
Assert.Contains(SentinelServerA.EndPoint, addresses);
Assert.Contains(SentinelServerC.EndPoint, addresses);

addresses = await SentinelServerC.SentinelGetSentinelAddresses(ServiceName).ForAwait();
addresses = await SentinelServerC.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait();
Assert.Contains(SentinelServerA.EndPoint, addresses);
Assert.Contains(SentinelServerB.EndPoint, addresses);
}
Expand Down