From 8c62c8f778ed79e5f29ceb32377209149a9820dc Mon Sep 17 00:00:00 2001 From: mgravell Date: Wed, 8 Apr 2020 15:30:59 +0100 Subject: [PATCH 1/3] add a new cluster kind for redis-cluster-proxy; in most ways it is similar to twemproxy --- src/StackExchange.Redis/CommandMap.cs | 45 +++++++++++++++- .../ConfigurationOptions.cs | 11 +++- .../ConnectionMultiplexer.cs | 53 ++++++++++++++----- src/StackExchange.Redis/Enums/Proxy.cs | 6 ++- src/StackExchange.Redis/Enums/ServerType.cs | 6 ++- src/StackExchange.Redis/ServerEndPoint.cs | 26 ++++++--- .../RedisClusterProxyTests.cs | 41 ++++++++++++++ 7 files changed, 163 insertions(+), 25 deletions(-) create mode 100644 tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs diff --git a/src/StackExchange.Redis/CommandMap.cs b/src/StackExchange.Redis/CommandMap.cs index 9e0c53ec9..6a9500067 100644 --- a/src/StackExchange.Redis/CommandMap.cs +++ b/src/StackExchange.Redis/CommandMap.cs @@ -21,7 +21,7 @@ internal CommandMap(CommandBytes[] map) public static CommandMap Default { get; } = CreateImpl(null, null); /// - /// The commands available to https://github.com/twitter/twemproxy + /// The commands available to twemproxy /// /// https://github.com/twitter/twemproxy/blob/master/notes/redis.md public static CommandMap Twemproxy { get; } = CreateImpl(null, exclusions: new HashSet @@ -53,6 +53,49 @@ internal CommandMap(CommandBytes[] map) RedisCommand.SHUTDOWN, RedisCommand.SLAVEOF, RedisCommand.SLOWLOG, RedisCommand.SYNC, RedisCommand.TIME }); + /// + /// The commands available to redis-cluster-proxy + /// + /// https://github.com/RedisLabs/redis-cluster-proxy/ + public static CommandMap RedisClusterProxy { get; } = CreateImpl(null, exclusions: new HashSet + { + // via "proxy command unsupported" + // RedisCommand.ACL, + RedisCommand.ASKING, + RedisCommand.CLIENT, + RedisCommand.CLUSTER, + RedisCommand.CONFIG, + RedisCommand.DEBUG, + // RedisCommand.HELLO, + RedisCommand.INFO, + RedisCommand.LATENCY, + RedisCommand.MEMORY, + RedisCommand.MIGRATE, + // RedisCommand.MODULE, + RedisCommand.MONITOR, + // RedisCommand.PFDEBUG, + // RedisCommand.PFSELFTEST, + RedisCommand.PSUBSCRIBE, + // RedisCommand.PSYNC, + RedisCommand.PUBLISH, + RedisCommand.PUBSUB, + RedisCommand.PUNSUBSCRIBE, + RedisCommand.READONLY, + RedisCommand.READWRITE, + // RedisCommand.REPLCONF, + // RedisCommand.REPLICAOF, + // RedisCommand.ROLE, + RedisCommand.SCRIPT, + RedisCommand.SHUTDOWN, + RedisCommand.SLAVEOF, + RedisCommand.SLOWLOG, + RedisCommand.SUBSCRIBE, + RedisCommand.SYNC, + RedisCommand.TIME, + RedisCommand.UNSUBSCRIBE, + // RedisCommand.WAIT, + }); + /// /// The commands available to http://www.ideawu.com/ssdb/ /// diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index c1c5dc499..5edfed5da 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -251,6 +251,8 @@ public CommandMap CommandMap { case Proxy.Twemproxy: return CommandMap.Twemproxy; + case Proxy.RedisClusterProxy: + return CommandMap.RedisClusterProxy; default: return CommandMap.Default; } @@ -482,7 +484,14 @@ public ConfigurationOptions Clone() /// public void SetDefaultPorts() { - EndPoints.SetDefaultPorts(Ssl ? 6380 : 6379); + if (Proxy == Proxy.RedisClusterProxy && !Ssl) + { + EndPoints.SetDefaultPorts(7777); // default port for redis-cluster-proxy + } + else + { + EndPoints.SetDefaultPorts(Ssl ? 6380 : 6379); + } } /// diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index f8e4dd7d4..232994a7c 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -1248,7 +1248,12 @@ internal long LastHeartbeatSecondsAgo /// The async state object to pass to the created . public ISubscriber GetSubscriber(object asyncState = null) { - if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy"); + switch (RawConfig.Proxy) + { + case Proxy.Twemproxy: + case Proxy.RedisClusterProxy: + throw new NotSupportedException($"The pub/sub API is not available via {RawConfig.Proxy}"); + } return new RedisSubscriber(this, asyncState); } @@ -1263,7 +1268,15 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null) db = RawConfig.DefaultDatabase ?? 0; if (db < 0) throw new ArgumentOutOfRangeException(nameof(db)); - if (db != 0 && RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("Twemproxy only supports database 0"); + if (db != 0) + { + switch (RawConfig.Proxy) + { + case Proxy.Twemproxy: + case Proxy.RedisClusterProxy: + throw new NotSupportedException($"{RawConfig.Proxy} only supports database 0"); + } + } // if there's no async-state, and the DB is suitable, we can hand out a re-used instance return (asyncState == null && db <= MaxCachedDatabaseInstance) @@ -1318,7 +1331,12 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null) public IServer GetServer(EndPoint endpoint, object asyncState = null) { if (endpoint == null) throw new ArgumentNullException(nameof(endpoint)); - if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The server API is not available via twemproxy"); + switch (RawConfig.Proxy) + { + case Proxy.RedisClusterProxy: + case Proxy.Twemproxy: + throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}"); + } var server = (ServerEndPoint)servers[endpoint]; if (server == null) throw new ArgumentException("The specified endpoint is not defined", nameof(endpoint)); return new RedisServer(this, server, asyncState); @@ -1634,6 +1652,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP { case ServerType.Twemproxy: case ServerType.Standalone: + case ServerType.RedisClusterProxy: standaloneCount++; break; case ServerType.Sentinel: @@ -1657,6 +1676,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP switch (server.ServerType) { case ServerType.Twemproxy: + case ServerType.RedisClusterProxy: case ServerType.Sentinel: case ServerType.Standalone: case ServerType.Cluster: @@ -1701,17 +1721,24 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP if (clusterCount == 0) { // set the serverSelectionStrategy - if (RawConfig.Proxy == Proxy.Twemproxy) - { - ServerSelectionStrategy.ServerType = ServerType.Twemproxy; - } - else if (standaloneCount == 0 && sentinelCount > 0) - { - ServerSelectionStrategy.ServerType = ServerType.Sentinel; - } - else + switch (RawConfig.Proxy) { - ServerSelectionStrategy.ServerType = ServerType.Standalone; + case Proxy.Twemproxy: + ServerSelectionStrategy.ServerType = ServerType.Twemproxy; + break; + case Proxy.RedisClusterProxy: + ServerSelectionStrategy.ServerType = ServerType.RedisClusterProxy; + break; + default: + if (standaloneCount == 0 && sentinelCount > 0) + { + ServerSelectionStrategy.ServerType = ServerType.Sentinel; + } + else + { + ServerSelectionStrategy.ServerType = ServerType.Standalone; + } + break; } var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait(); foreach (var master in masters) diff --git a/src/StackExchange.Redis/Enums/Proxy.cs b/src/StackExchange.Redis/Enums/Proxy.cs index ed87ba495..369820ad6 100644 --- a/src/StackExchange.Redis/Enums/Proxy.cs +++ b/src/StackExchange.Redis/Enums/Proxy.cs @@ -12,6 +12,10 @@ public enum Proxy /// /// Communication via twemproxy /// - Twemproxy + Twemproxy, + /// + /// Communication via redis-cluster-proxy + /// + RedisClusterProxy, } } diff --git a/src/StackExchange.Redis/Enums/ServerType.cs b/src/StackExchange.Redis/Enums/ServerType.cs index 80072f34a..3d15dbdb9 100644 --- a/src/StackExchange.Redis/Enums/ServerType.cs +++ b/src/StackExchange.Redis/Enums/ServerType.cs @@ -20,6 +20,10 @@ public enum ServerType /// /// Distributed redis installation via twemproxy /// - Twemproxy + Twemproxy, + /// + /// Distributed redis installation via redis-cluster-proxy + /// + RedisClusterProxy, } } diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index aff87393e..de416b2e1 100644 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -57,11 +57,19 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60; serverType = ServerType.Standalone; - // overrides for twemproxy - if (multiplexer.RawConfig.Proxy == Proxy.Twemproxy) + // overrides for twemproxy etc + switch (multiplexer.RawConfig.Proxy) { - databases = 1; - serverType = ServerType.Twemproxy; + case Proxy.Twemproxy: + databases = 1; + serverType = ServerType.Twemproxy; + break; + case Proxy.RedisClusterProxy: + databases = 1; + serverType = ServerType.RedisClusterProxy; + if (version < RedisFeatures.v3_0_0) // cluster is at least 3.0 + version = RedisFeatures.v3_0_0; + break; } } @@ -254,11 +262,13 @@ internal void AddScript(string script, byte[] hash) internal void AutoConfigure(PhysicalConnection connection) { - if (serverType == ServerType.Twemproxy) + switch (serverType) { - // don't try to detect configuration; all the config commands are disabled, and - // the fallback master/slave detection won't help - return; + case ServerType.Twemproxy: + case ServerType.RedisClusterProxy: + // don't try to detect configuration; all the config commands are disabled, and + // the fallback master/slave detection won't help + return; } var commandMap = Multiplexer.CommandMap; diff --git a/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs b/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs new file mode 100644 index 000000000..bc6e75838 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs @@ -0,0 +1,41 @@ +using System; +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests +{ + public class RedisClusterProxyTests : TestBase + { + public RedisClusterProxyTests(ITestOutputHelper output) : base(output) { } + + protected override string GetConfiguration() => "127.0.0.1,proxy=RedisClusterProxy,version=5.0"; + + [Fact(Skip = "No CI build for this yet")] + public void CanConnectToClusterProxy() + { + using (var conn = Create()) + { + var expected = Guid.NewGuid().ToString(); + var db = conn.GetDatabase(); + RedisKey key = Me(); + db.StringSet(key, expected); + var actual = (string)db.StringGet(key); + Assert.Equal(expected, actual); + + // check it knows that we're dealing with a cluster + var ex = Assert.Throws(() => conn.GetServer("abc")); + Assert.Equal("The server API is not available via RedisClusterProxy", ex.Message); + + ex = Assert.Throws(() => conn.GetSubscriber("abc")); + Assert.Equal("The pub/sub API is not available via RedisClusterProxy", ex.Message); + + // test a script + const string LUA_SCRIPT = "return redis.call('info')"; + var name = (string)db.ScriptEvaluate(LUA_SCRIPT); + Log($"client: {name}"); + // run it twice to check we didn't rely on script hashing (SCRIPT is disabled) + _ = db.ScriptEvaluate(LUA_SCRIPT); + } + } + } +} From 9cfa0435c25e42dbdbb1ae82472a2fc86d255563 Mon Sep 17 00:00:00 2001 From: mgravell Date: Wed, 8 Apr 2020 18:33:12 +0100 Subject: [PATCH 2/3] need to allow the GetServer API to expose KEYS/SCAN, which is supported --- src/StackExchange.Redis/ConnectionMultiplexer.cs | 1 - tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index 232994a7c..06f94a2c5 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -1333,7 +1333,6 @@ public IServer GetServer(EndPoint endpoint, object asyncState = null) if (endpoint == null) throw new ArgumentNullException(nameof(endpoint)); switch (RawConfig.Proxy) { - case Proxy.RedisClusterProxy: case Proxy.Twemproxy: throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}"); } diff --git a/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs b/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs index bc6e75838..cdb09aec7 100644 --- a/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs +++ b/tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs @@ -23,10 +23,11 @@ public void CanConnectToClusterProxy() Assert.Equal(expected, actual); // check it knows that we're dealing with a cluster - var ex = Assert.Throws(() => conn.GetServer("abc")); - Assert.Equal("The server API is not available via RedisClusterProxy", ex.Message); + var server = conn.GetServer(conn.GetEndPoints()[0]); + Assert.Equal(ServerType.RedisClusterProxy, server.ServerType); + _ = server.Echo("abc"); - ex = Assert.Throws(() => conn.GetSubscriber("abc")); + var ex = Assert.Throws(() => conn.GetSubscriber("abc")); Assert.Equal("The pub/sub API is not available via RedisClusterProxy", ex.Message); // test a script From 1b8d8628a3634d741f88b7910b293f940500f723 Mon Sep 17 00:00:00 2001 From: mgravell Date: Thu, 9 Apr 2020 16:55:33 +0100 Subject: [PATCH 3/3] utility API for treating NameValueEntry[] (from streams) as maps --- src/StackExchange.Redis/ExtensionMethods.cs | 32 +++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/StackExchange.Redis/ExtensionMethods.cs b/src/StackExchange.Redis/ExtensionMethods.cs index a9b7fd14b..11d2dc907 100644 --- a/src/StackExchange.Redis/ExtensionMethods.cs +++ b/src/StackExchange.Redis/ExtensionMethods.cs @@ -78,6 +78,38 @@ public static Dictionary ToDictionary(this SortedSetEntry[] return result; } + /// + /// Create a dictionary from an array of key/value pairs + /// + /// The pairs to convert to a dictionary. + public static Dictionary ToStringDictionary(this NameValueEntry[] pairs) + { + if (pairs == null) return null; + + var result = new Dictionary(pairs.Length, StringComparer.Ordinal); + for (int i = 0; i < pairs.Length; i++) + { + result.Add(pairs[i].name, pairs[i].value); + } + return result; + } + + /// + /// Create a dictionary from an array of NameValueEntry values + /// + /// The entries to convert to a dictionary. + public static Dictionary ToDictionary(this NameValueEntry[] values) + { + if (values == null) return null; + + var result = new Dictionary(values.Length); + for (int i = 0; i < values.Length; i++) + { + result.Add(values[i].name, values[i].value); + } + return result; + } + /// /// Create a dictionary from an array of key/value pairs ///