diff --git a/src/StackExchange.Redis/CommandMap.cs b/src/StackExchange.Redis/CommandMap.cs
index 9e0c53ec9..6a9500067 100644
--- a/src/StackExchange.Redis/CommandMap.cs
+++ b/src/StackExchange.Redis/CommandMap.cs
@@ -21,7 +21,7 @@ internal CommandMap(CommandBytes[] map)
public static CommandMap Default { get; } = CreateImpl(null, null);
///
- /// The commands available to https://github.com/twitter/twemproxy
+ /// The commands available to twemproxy
///
/// https://github.com/twitter/twemproxy/blob/master/notes/redis.md
public static CommandMap Twemproxy { get; } = CreateImpl(null, exclusions: new HashSet
@@ -53,6 +53,49 @@ internal CommandMap(CommandBytes[] map)
RedisCommand.SHUTDOWN, RedisCommand.SLAVEOF, RedisCommand.SLOWLOG, RedisCommand.SYNC, RedisCommand.TIME
});
+ ///
+ /// The commands available to redis-cluster-proxy
+ ///
+ /// https://github.com/RedisLabs/redis-cluster-proxy/
+ public static CommandMap RedisClusterProxy { get; } = CreateImpl(null, exclusions: new HashSet
+ {
+ // via "proxy command unsupported"
+ // RedisCommand.ACL,
+ RedisCommand.ASKING,
+ RedisCommand.CLIENT,
+ RedisCommand.CLUSTER,
+ RedisCommand.CONFIG,
+ RedisCommand.DEBUG,
+ // RedisCommand.HELLO,
+ RedisCommand.INFO,
+ RedisCommand.LATENCY,
+ RedisCommand.MEMORY,
+ RedisCommand.MIGRATE,
+ // RedisCommand.MODULE,
+ RedisCommand.MONITOR,
+ // RedisCommand.PFDEBUG,
+ // RedisCommand.PFSELFTEST,
+ RedisCommand.PSUBSCRIBE,
+ // RedisCommand.PSYNC,
+ RedisCommand.PUBLISH,
+ RedisCommand.PUBSUB,
+ RedisCommand.PUNSUBSCRIBE,
+ RedisCommand.READONLY,
+ RedisCommand.READWRITE,
+ // RedisCommand.REPLCONF,
+ // RedisCommand.REPLICAOF,
+ // RedisCommand.ROLE,
+ RedisCommand.SCRIPT,
+ RedisCommand.SHUTDOWN,
+ RedisCommand.SLAVEOF,
+ RedisCommand.SLOWLOG,
+ RedisCommand.SUBSCRIBE,
+ RedisCommand.SYNC,
+ RedisCommand.TIME,
+ RedisCommand.UNSUBSCRIBE,
+ // RedisCommand.WAIT,
+ });
+
///
/// The commands available to http://www.ideawu.com/ssdb/
///
diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs
index c1c5dc499..5edfed5da 100644
--- a/src/StackExchange.Redis/ConfigurationOptions.cs
+++ b/src/StackExchange.Redis/ConfigurationOptions.cs
@@ -251,6 +251,8 @@ public CommandMap CommandMap
{
case Proxy.Twemproxy:
return CommandMap.Twemproxy;
+ case Proxy.RedisClusterProxy:
+ return CommandMap.RedisClusterProxy;
default:
return CommandMap.Default;
}
@@ -482,7 +484,14 @@ public ConfigurationOptions Clone()
///
public void SetDefaultPorts()
{
- EndPoints.SetDefaultPorts(Ssl ? 6380 : 6379);
+ if (Proxy == Proxy.RedisClusterProxy && !Ssl)
+ {
+ EndPoints.SetDefaultPorts(7777); // default port for redis-cluster-proxy
+ }
+ else
+ {
+ EndPoints.SetDefaultPorts(Ssl ? 6380 : 6379);
+ }
}
///
diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs
index f8e4dd7d4..06f94a2c5 100644
--- a/src/StackExchange.Redis/ConnectionMultiplexer.cs
+++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs
@@ -1248,7 +1248,12 @@ internal long LastHeartbeatSecondsAgo
/// The async state object to pass to the created .
public ISubscriber GetSubscriber(object asyncState = null)
{
- if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy");
+ switch (RawConfig.Proxy)
+ {
+ case Proxy.Twemproxy:
+ case Proxy.RedisClusterProxy:
+ throw new NotSupportedException($"The pub/sub API is not available via {RawConfig.Proxy}");
+ }
return new RedisSubscriber(this, asyncState);
}
@@ -1263,7 +1268,15 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
db = RawConfig.DefaultDatabase ?? 0;
if (db < 0) throw new ArgumentOutOfRangeException(nameof(db));
- if (db != 0 && RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("Twemproxy only supports database 0");
+ if (db != 0)
+ {
+ switch (RawConfig.Proxy)
+ {
+ case Proxy.Twemproxy:
+ case Proxy.RedisClusterProxy:
+ throw new NotSupportedException($"{RawConfig.Proxy} only supports database 0");
+ }
+ }
// if there's no async-state, and the DB is suitable, we can hand out a re-used instance
return (asyncState == null && db <= MaxCachedDatabaseInstance)
@@ -1318,7 +1331,11 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
public IServer GetServer(EndPoint endpoint, object asyncState = null)
{
if (endpoint == null) throw new ArgumentNullException(nameof(endpoint));
- if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The server API is not available via twemproxy");
+ switch (RawConfig.Proxy)
+ {
+ case Proxy.Twemproxy:
+ throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}");
+ }
var server = (ServerEndPoint)servers[endpoint];
if (server == null) throw new ArgumentException("The specified endpoint is not defined", nameof(endpoint));
return new RedisServer(this, server, asyncState);
@@ -1634,6 +1651,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP
{
case ServerType.Twemproxy:
case ServerType.Standalone:
+ case ServerType.RedisClusterProxy:
standaloneCount++;
break;
case ServerType.Sentinel:
@@ -1657,6 +1675,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP
switch (server.ServerType)
{
case ServerType.Twemproxy:
+ case ServerType.RedisClusterProxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
@@ -1701,17 +1720,24 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP
if (clusterCount == 0)
{
// set the serverSelectionStrategy
- if (RawConfig.Proxy == Proxy.Twemproxy)
- {
- ServerSelectionStrategy.ServerType = ServerType.Twemproxy;
- }
- else if (standaloneCount == 0 && sentinelCount > 0)
- {
- ServerSelectionStrategy.ServerType = ServerType.Sentinel;
- }
- else
+ switch (RawConfig.Proxy)
{
- ServerSelectionStrategy.ServerType = ServerType.Standalone;
+ case Proxy.Twemproxy:
+ ServerSelectionStrategy.ServerType = ServerType.Twemproxy;
+ break;
+ case Proxy.RedisClusterProxy:
+ ServerSelectionStrategy.ServerType = ServerType.RedisClusterProxy;
+ break;
+ default:
+ if (standaloneCount == 0 && sentinelCount > 0)
+ {
+ ServerSelectionStrategy.ServerType = ServerType.Sentinel;
+ }
+ else
+ {
+ ServerSelectionStrategy.ServerType = ServerType.Standalone;
+ }
+ break;
}
var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait();
foreach (var master in masters)
diff --git a/src/StackExchange.Redis/Enums/Proxy.cs b/src/StackExchange.Redis/Enums/Proxy.cs
index ed87ba495..369820ad6 100644
--- a/src/StackExchange.Redis/Enums/Proxy.cs
+++ b/src/StackExchange.Redis/Enums/Proxy.cs
@@ -12,6 +12,10 @@ public enum Proxy
///
/// Communication via twemproxy
///
- Twemproxy
+ Twemproxy,
+ ///
+ /// Communication via redis-cluster-proxy
+ ///
+ RedisClusterProxy,
}
}
diff --git a/src/StackExchange.Redis/Enums/ServerType.cs b/src/StackExchange.Redis/Enums/ServerType.cs
index 80072f34a..3d15dbdb9 100644
--- a/src/StackExchange.Redis/Enums/ServerType.cs
+++ b/src/StackExchange.Redis/Enums/ServerType.cs
@@ -20,6 +20,10 @@ public enum ServerType
///
/// Distributed redis installation via twemproxy
///
- Twemproxy
+ Twemproxy,
+ ///
+ /// Distributed redis installation via redis-cluster-proxy
+ ///
+ RedisClusterProxy,
}
}
diff --git a/src/StackExchange.Redis/ExtensionMethods.cs b/src/StackExchange.Redis/ExtensionMethods.cs
index a9b7fd14b..11d2dc907 100644
--- a/src/StackExchange.Redis/ExtensionMethods.cs
+++ b/src/StackExchange.Redis/ExtensionMethods.cs
@@ -78,6 +78,38 @@ public static Dictionary ToDictionary(this SortedSetEntry[]
return result;
}
+ ///
+ /// Create a dictionary from an array of key/value pairs
+ ///
+ /// The pairs to convert to a dictionary.
+ public static Dictionary ToStringDictionary(this NameValueEntry[] pairs)
+ {
+ if (pairs == null) return null;
+
+ var result = new Dictionary(pairs.Length, StringComparer.Ordinal);
+ for (int i = 0; i < pairs.Length; i++)
+ {
+ result.Add(pairs[i].name, pairs[i].value);
+ }
+ return result;
+ }
+
+ ///
+ /// Create a dictionary from an array of NameValueEntry values
+ ///
+ /// The entries to convert to a dictionary.
+ public static Dictionary ToDictionary(this NameValueEntry[] values)
+ {
+ if (values == null) return null;
+
+ var result = new Dictionary(values.Length);
+ for (int i = 0; i < values.Length; i++)
+ {
+ result.Add(values[i].name, values[i].value);
+ }
+ return result;
+ }
+
///
/// Create a dictionary from an array of key/value pairs
///
diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs
index aff87393e..de416b2e1 100644
--- a/src/StackExchange.Redis/ServerEndPoint.cs
+++ b/src/StackExchange.Redis/ServerEndPoint.cs
@@ -57,11 +57,19 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60;
serverType = ServerType.Standalone;
- // overrides for twemproxy
- if (multiplexer.RawConfig.Proxy == Proxy.Twemproxy)
+ // overrides for twemproxy etc
+ switch (multiplexer.RawConfig.Proxy)
{
- databases = 1;
- serverType = ServerType.Twemproxy;
+ case Proxy.Twemproxy:
+ databases = 1;
+ serverType = ServerType.Twemproxy;
+ break;
+ case Proxy.RedisClusterProxy:
+ databases = 1;
+ serverType = ServerType.RedisClusterProxy;
+ if (version < RedisFeatures.v3_0_0) // cluster is at least 3.0
+ version = RedisFeatures.v3_0_0;
+ break;
}
}
@@ -254,11 +262,13 @@ internal void AddScript(string script, byte[] hash)
internal void AutoConfigure(PhysicalConnection connection)
{
- if (serverType == ServerType.Twemproxy)
+ switch (serverType)
{
- // don't try to detect configuration; all the config commands are disabled, and
- // the fallback master/slave detection won't help
- return;
+ case ServerType.Twemproxy:
+ case ServerType.RedisClusterProxy:
+ // don't try to detect configuration; all the config commands are disabled, and
+ // the fallback master/slave detection won't help
+ return;
}
var commandMap = Multiplexer.CommandMap;
diff --git a/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs b/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs
new file mode 100644
index 000000000..cdb09aec7
--- /dev/null
+++ b/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs
@@ -0,0 +1,42 @@
+using System;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace StackExchange.Redis.Tests
+{
+ public class RedisClusterProxyTests : TestBase
+ {
+ public RedisClusterProxyTests(ITestOutputHelper output) : base(output) { }
+
+ protected override string GetConfiguration() => "127.0.0.1,proxy=RedisClusterProxy,version=5.0";
+
+ [Fact(Skip = "No CI build for this yet")]
+ public void CanConnectToClusterProxy()
+ {
+ using (var conn = Create())
+ {
+ var expected = Guid.NewGuid().ToString();
+ var db = conn.GetDatabase();
+ RedisKey key = Me();
+ db.StringSet(key, expected);
+ var actual = (string)db.StringGet(key);
+ Assert.Equal(expected, actual);
+
+ // check it knows that we're dealing with a cluster
+ var server = conn.GetServer(conn.GetEndPoints()[0]);
+ Assert.Equal(ServerType.RedisClusterProxy, server.ServerType);
+ _ = server.Echo("abc");
+
+ var ex = Assert.Throws(() => conn.GetSubscriber("abc"));
+ Assert.Equal("The pub/sub API is not available via RedisClusterProxy", ex.Message);
+
+ // test a script
+ const string LUA_SCRIPT = "return redis.call('info')";
+ var name = (string)db.ScriptEvaluate(LUA_SCRIPT);
+ Log($"client: {name}");
+ // run it twice to check we didn't rely on script hashing (SCRIPT is disabled)
+ _ = db.ScriptEvaluate(LUA_SCRIPT);
+ }
+ }
+ }
+}