Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ private static void WriteNormalizingLineEndings(string source, StreamWriter writ

/// <summary>
/// Raised when nodes are explicitly requested to reconfigure via broadcast.
/// This usually means primary/replica role changes.
/// This usually means primary/replica changes.
/// </summary>
public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast;

Expand Down Expand Up @@ -1418,7 +1418,10 @@ internal long LastHeartbeatSecondsAgo
/// <param name="asyncState">The async state object to pass to the created <see cref="RedisSubscriber"/>.</param>
public ISubscriber GetSubscriber(object asyncState = null)
{
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy");
if (!RawConfig.Proxy.SupportsPubSub())
{
throw new NotSupportedException($"The pub/sub API is not available via {RawConfig.Proxy}");
}
return new RedisSubscriber(this, asyncState);
}

Expand All @@ -1436,9 +1439,9 @@ internal int ApplyDefaultDatabase(int db)
throw new ArgumentOutOfRangeException(nameof(db));
}

if (db != 0 && RawConfig.Proxy == Proxy.Twemproxy)
if (db != 0 && !RawConfig.Proxy.SupportsDatabases())
{
throw new NotSupportedException("twemproxy only supports database 0");
throw new NotSupportedException($"{RawConfig.Proxy} only supports database 0");
}

return db;
Expand Down Expand Up @@ -1506,7 +1509,10 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
public IServer GetServer(EndPoint endpoint, object asyncState = null)
{
if (endpoint == null) throw new ArgumentNullException(nameof(endpoint));
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The server API is not available via twemproxy");
if (!RawConfig.Proxy.SupportsServerApi())
{
throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}");
}
var server = (ServerEndPoint)servers[endpoint];
if (server == null) throw new ArgumentException("The specified endpoint is not defined", nameof(endpoint));
return new RedisServer(this, server, asyncState);
Expand Down Expand Up @@ -1882,18 +1888,24 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
ServerSelectionStrategy.ServerType = ServerType.Standalone;
}

var preferred = NominatePreferredMaster(log, servers, useTieBreakers, masters);
foreach (var master in masters)
// If multiple primaries are detected, nominate the preferred one
// ...but not if the type of server we're connected to supports and expects multiple primaries
// ...for those cases, we want to allow sending to any primary endpoint.
if (ServerSelectionStrategy.ServerType.HasSinglePrimary())
{
if (master == preferred || master.IsReplica)
{
log?.WriteLine($"{Format.ToString(master)}: Clearing as RedundantMaster");
master.ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
var preferred = NominatePreferredMaster(log, servers, useTieBreakers, masters);
foreach (var master in masters)
{
log?.WriteLine($"{Format.ToString(master)}: Setting as RedundantMaster");
master.SetUnselectable(UnselectableFlags.RedundantMaster);
if (master == preferred || master.IsReplica)
{
log?.WriteLine($"{Format.ToString(master)}: Clearing as RedundantMaster");
master.ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
log?.WriteLine($"{Format.ToString(master)}: Setting as RedundantMaster");
master.SetUnselectable(UnselectableFlags.RedundantMaster);
}
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions src/StackExchange.Redis/Enums/Proxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,34 @@ public enum Proxy
/// </summary>
Twemproxy,
}

internal static class ProxyExtensions
{
/// <summary>
/// Whether a proxy supports databases (e.g. database > 0).
/// </summary>
public static bool SupportsDatabases(this Proxy proxy) => proxy switch
{
Proxy.Twemproxy => false,
_ => true
};

/// <summary>
/// Whether a proxy supports pub/sub.
/// </summary>
public static bool SupportsPubSub(this Proxy proxy) => proxy switch
{
Proxy.Twemproxy => false,
_ => true
};

/// <summary>
/// Whether a proxy supports the <c>ConnectionMultiplexer.GetServer</c>.
/// </summary>
public static bool SupportsServerApi(this Proxy proxy) => proxy switch
{
Proxy.Twemproxy => false,
_ => true
};
}
}
20 changes: 20 additions & 0 deletions src/StackExchange.Redis/Enums/ServerType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,24 @@ public enum ServerType
/// </summary>
Twemproxy,
}

internal static class ServerTypeExtensions
{
/// <summary>
/// Whether a server type can have only a single primary, meaning an election if multiple are found.
/// </summary>
public static bool HasSinglePrimary(this ServerType type) => type switch
{
_ => true
};

/// <summary>
/// Whether a server type supports <see cref="ServerEndPoint.AutoConfigureAsync(PhysicalConnection, ConnectionMultiplexer.LogProxy)"/>.
/// </summary>
public static bool SupportsAutoConfigure(this ServerType type) => type switch
{
ServerType.Twemproxy => false,
_ => true
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public interface IConnectionMultiplexer : IDisposable

/// <summary>
/// Raised when nodes are explicitly requested to reconfigure via broadcast.
/// This usually means primary/replica role changes.
/// This usually means primary/replica changes.
/// </summary>
event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast;

Expand Down
6 changes: 3 additions & 3 deletions src/StackExchange.Redis/ServerEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,10 @@ internal void AddScript(string script, byte[] hash)

internal async Task AutoConfigureAsync(PhysicalConnection connection, LogProxy log = null)
{
if (serverType == ServerType.Twemproxy)
if (!serverType.SupportsAutoConfigure())
{
// don't try to detect configuration; all the config commands are disabled, and
// the fallback master/replica detection won't help
// Don't try to detect configuration.
// All the config commands are disabled and the fallback primary/replica detection won't help
return;
}

Expand Down