Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/StackExchange.Redis/CommandMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal CommandMap(CommandBytes[] map)
public static CommandMap Default { get; } = CreateImpl(null, null);

/// <summary>
/// The commands available to <a href="twemproxy">https://github.com/twitter/twemproxy</a>
/// The commands available to <a href="https://github.com/twitter/twemproxy/">twemproxy</a>
/// </summary>
/// <remarks>https://github.com/twitter/twemproxy/blob/master/notes/redis.md</remarks>
public static CommandMap Twemproxy { get; } = CreateImpl(null, exclusions: new HashSet<RedisCommand>
Expand Down Expand Up @@ -53,6 +53,49 @@ internal CommandMap(CommandBytes[] map)
RedisCommand.SHUTDOWN, RedisCommand.SLAVEOF, RedisCommand.SLOWLOG, RedisCommand.SYNC, RedisCommand.TIME
});

/// <summary>
/// The commands available to <a href="https://github.com/RedisLabs/redis-cluster-proxy/">redis-cluster-proxy</a>
/// </summary>
/// <remarks>https://github.com/RedisLabs/redis-cluster-proxy/</remarks>
public static CommandMap RedisClusterProxy { get; } = CreateImpl(null, exclusions: new HashSet<RedisCommand>
{
// via "proxy command unsupported"
// RedisCommand.ACL,
RedisCommand.ASKING,
RedisCommand.CLIENT,
RedisCommand.CLUSTER,
RedisCommand.CONFIG,
RedisCommand.DEBUG,
// RedisCommand.HELLO,
RedisCommand.INFO,
RedisCommand.LATENCY,
RedisCommand.MEMORY,
RedisCommand.MIGRATE,
// RedisCommand.MODULE,
RedisCommand.MONITOR,
// RedisCommand.PFDEBUG,
// RedisCommand.PFSELFTEST,
RedisCommand.PSUBSCRIBE,
// RedisCommand.PSYNC,
RedisCommand.PUBLISH,
RedisCommand.PUBSUB,
RedisCommand.PUNSUBSCRIBE,
RedisCommand.READONLY,
RedisCommand.READWRITE,
// RedisCommand.REPLCONF,
// RedisCommand.REPLICAOF,
// RedisCommand.ROLE,
RedisCommand.SCRIPT,
RedisCommand.SHUTDOWN,
RedisCommand.SLAVEOF,
RedisCommand.SLOWLOG,
RedisCommand.SUBSCRIBE,
RedisCommand.SYNC,
RedisCommand.TIME,
RedisCommand.UNSUBSCRIBE,
// RedisCommand.WAIT,
});

/// <summary>
/// The commands available to <a href="ssdb">http://www.ideawu.com/ssdb/</a>
/// </summary>
Expand Down
11 changes: 10 additions & 1 deletion src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ public CommandMap CommandMap
{
case Proxy.Twemproxy:
return CommandMap.Twemproxy;
case Proxy.RedisClusterProxy:
return CommandMap.RedisClusterProxy;
default:
return CommandMap.Default;
}
Expand Down Expand Up @@ -482,7 +484,14 @@ public ConfigurationOptions Clone()
/// </summary>
public void SetDefaultPorts()
{
EndPoints.SetDefaultPorts(Ssl ? 6380 : 6379);
if (Proxy == Proxy.RedisClusterProxy && !Ssl)
{
EndPoints.SetDefaultPorts(7777); // default port for redis-cluster-proxy
}
else
{
EndPoints.SetDefaultPorts(Ssl ? 6380 : 6379);
}
}

/// <summary>
Expand Down
52 changes: 39 additions & 13 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,12 @@ internal long LastHeartbeatSecondsAgo
/// <param name="asyncState">The async state object to pass to the created <see cref="RedisSubscriber"/>.</param>
public ISubscriber GetSubscriber(object asyncState = null)
{
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy");
switch (RawConfig.Proxy)
{
case Proxy.Twemproxy:
case Proxy.RedisClusterProxy:
throw new NotSupportedException($"The pub/sub API is not available via {RawConfig.Proxy}");
}
return new RedisSubscriber(this, asyncState);
}

Expand All @@ -1263,7 +1268,15 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
db = RawConfig.DefaultDatabase ?? 0;

if (db < 0) throw new ArgumentOutOfRangeException(nameof(db));
if (db != 0 && RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("Twemproxy only supports database 0");
if (db != 0)
{
switch (RawConfig.Proxy)
{
case Proxy.Twemproxy:
case Proxy.RedisClusterProxy:
throw new NotSupportedException($"{RawConfig.Proxy} only supports database 0");
}
}

// if there's no async-state, and the DB is suitable, we can hand out a re-used instance
return (asyncState == null && db <= MaxCachedDatabaseInstance)
Expand Down Expand Up @@ -1318,7 +1331,11 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null)
public IServer GetServer(EndPoint endpoint, object asyncState = null)
{
if (endpoint == null) throw new ArgumentNullException(nameof(endpoint));
if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The server API is not available via twemproxy");
switch (RawConfig.Proxy)
{
case Proxy.Twemproxy:
throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}");
}
var server = (ServerEndPoint)servers[endpoint];
if (server == null) throw new ArgumentException("The specified endpoint is not defined", nameof(endpoint));
return new RedisServer(this, server, asyncState);
Expand Down Expand Up @@ -1634,6 +1651,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
{
case ServerType.Twemproxy:
case ServerType.Standalone:
case ServerType.RedisClusterProxy:
standaloneCount++;
break;
case ServerType.Sentinel:
Expand All @@ -1657,6 +1675,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.RedisClusterProxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
Expand Down Expand Up @@ -1701,17 +1720,24 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
if (clusterCount == 0)
{
// set the serverSelectionStrategy
if (RawConfig.Proxy == Proxy.Twemproxy)
{
ServerSelectionStrategy.ServerType = ServerType.Twemproxy;
}
else if (standaloneCount == 0 && sentinelCount > 0)
{
ServerSelectionStrategy.ServerType = ServerType.Sentinel;
}
else
switch (RawConfig.Proxy)
{
ServerSelectionStrategy.ServerType = ServerType.Standalone;
case Proxy.Twemproxy:
ServerSelectionStrategy.ServerType = ServerType.Twemproxy;
break;
case Proxy.RedisClusterProxy:
ServerSelectionStrategy.ServerType = ServerType.RedisClusterProxy;
break;
default:
if (standaloneCount == 0 && sentinelCount > 0)
{
ServerSelectionStrategy.ServerType = ServerType.Sentinel;
}
else
{
ServerSelectionStrategy.ServerType = ServerType.Standalone;
}
break;
}
var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters).ObserveErrors().ForAwait();
foreach (var master in masters)
Expand Down
6 changes: 5 additions & 1 deletion src/StackExchange.Redis/Enums/Proxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ public enum Proxy
/// <summary>
/// Communication via <a href="https://github.com/twitter/twemproxy">twemproxy</a>
/// </summary>
Twemproxy
Twemproxy,
/// <summary>
/// Communication via <a href="https://github.com/RedisLabs/redis-cluster-proxy">redis-cluster-proxy</a>
/// </summary>
RedisClusterProxy,
}
}
6 changes: 5 additions & 1 deletion src/StackExchange.Redis/Enums/ServerType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public enum ServerType
/// <summary>
/// Distributed redis installation via <a href="https://github.com/twitter/twemproxy">twemproxy</a>
/// </summary>
Twemproxy
Twemproxy,
/// <summary>
/// Distributed redis installation via <a href="https://github.com/RedisLabs/redis-cluster-proxy">redis-cluster-proxy</a>
/// </summary>
RedisClusterProxy,
}
}
32 changes: 32 additions & 0 deletions src/StackExchange.Redis/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,38 @@ public static Dictionary<RedisValue, double> ToDictionary(this SortedSetEntry[]
return result;
}

/// <summary>
/// Create a dictionary from an array of key/value pairs
/// </summary>
/// <param name="pairs">The pairs to convert to a dictionary.</param>
public static Dictionary<string, string> ToStringDictionary(this NameValueEntry[] pairs)
{
if (pairs == null) return null;

var result = new Dictionary<string, string>(pairs.Length, StringComparer.Ordinal);
for (int i = 0; i < pairs.Length; i++)
{
result.Add(pairs[i].name, pairs[i].value);
}
return result;
}

/// <summary>
/// Create a dictionary from an array of NameValueEntry values
/// </summary>
/// <param name="values">The entries to convert to a dictionary.</param>
public static Dictionary<RedisValue, RedisValue> ToDictionary(this NameValueEntry[] values)
{
if (values == null) return null;

var result = new Dictionary<RedisValue, RedisValue>(values.Length);
for (int i = 0; i < values.Length; i++)
{
result.Add(values[i].name, values[i].value);
}
return result;
}

/// <summary>
/// Create a dictionary from an array of key/value pairs
/// </summary>
Expand Down
26 changes: 18 additions & 8 deletions src/StackExchange.Redis/ServerEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60;
serverType = ServerType.Standalone;

// overrides for twemproxy
if (multiplexer.RawConfig.Proxy == Proxy.Twemproxy)
// overrides for twemproxy etc
switch (multiplexer.RawConfig.Proxy)
{
databases = 1;
serverType = ServerType.Twemproxy;
case Proxy.Twemproxy:
databases = 1;
serverType = ServerType.Twemproxy;
break;
case Proxy.RedisClusterProxy:
databases = 1;
serverType = ServerType.RedisClusterProxy;
if (version < RedisFeatures.v3_0_0) // cluster is at least 3.0
version = RedisFeatures.v3_0_0;
break;
}
}

Expand Down Expand Up @@ -254,11 +262,13 @@ internal void AddScript(string script, byte[] hash)

internal void AutoConfigure(PhysicalConnection connection)
{
if (serverType == ServerType.Twemproxy)
switch (serverType)
{
// don't try to detect configuration; all the config commands are disabled, and
// the fallback master/slave detection won't help
return;
case ServerType.Twemproxy:
case ServerType.RedisClusterProxy:
// don't try to detect configuration; all the config commands are disabled, and
// the fallback master/slave detection won't help
return;
}

var commandMap = Multiplexer.CommandMap;
Expand Down
42 changes: 42 additions & 0 deletions tests/StackExchange.Redis.Tests/RedisClusterProxyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using Xunit;
using Xunit.Abstractions;

namespace StackExchange.Redis.Tests
{
public class RedisClusterProxyTests : TestBase
{
public RedisClusterProxyTests(ITestOutputHelper output) : base(output) { }

protected override string GetConfiguration() => "127.0.0.1,proxy=RedisClusterProxy,version=5.0";

[Fact(Skip = "No CI build for this yet")]
public void CanConnectToClusterProxy()
{
using (var conn = Create())
{
var expected = Guid.NewGuid().ToString();
var db = conn.GetDatabase();
RedisKey key = Me();
db.StringSet(key, expected);
var actual = (string)db.StringGet(key);
Assert.Equal(expected, actual);

// check it knows that we're dealing with a cluster
var server = conn.GetServer(conn.GetEndPoints()[0]);
Assert.Equal(ServerType.RedisClusterProxy, server.ServerType);
_ = server.Echo("abc");

var ex = Assert.Throws<NotSupportedException>(() => conn.GetSubscriber("abc"));
Assert.Equal("The pub/sub API is not available via RedisClusterProxy", ex.Message);

// test a script
const string LUA_SCRIPT = "return redis.call('info')";
var name = (string)db.ScriptEvaluate(LUA_SCRIPT);
Log($"client: {name}");
// run it twice to check we didn't rely on script hashing (SCRIPT is disabled)
_ = db.ScriptEvaluate(LUA_SCRIPT);
}
}
}
}