From 1b30e0a89180e02a16d5061f25e034136be308a8 Mon Sep 17 00:00:00 2001 From: Karthick Ramachandran Date: Sun, 6 Feb 2022 13:41:37 -0800 Subject: [PATCH 1/3] support for envoy proxy --- docs/Configuration.md | 11 ++- src/StackExchange.Redis/CommandMap.cs | 34 ++++++++- .../ConfigurationOptions.cs | 1 + .../ConnectionMultiplexer.cs | 73 +++++++++++++------ src/StackExchange.Redis/Enums/Proxy.cs | 6 +- src/StackExchange.Redis/Enums/ServerType.cs | 6 +- src/StackExchange.Redis/PhysicalConnection.cs | 2 +- src/StackExchange.Redis/RedisBase.cs | 2 +- src/StackExchange.Redis/ServerEndPoint.cs | 18 +++-- .../ServerSelectionStrategy.cs | 7 +- 10 files changed, 122 insertions(+), 38 deletions(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index 7e35a96d0..8e8a7a0c3 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -86,7 +86,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a | name={string} | `ClientName` | `null` | Identification for the connection within redis | | password={string} | `Password` | `null` | Password for the redis server | | user={string} | `User` | `null` | User for the redis server (for use with ACLs on redis 6 and above) | -| proxy={proxy type} | `Proxy` | `Proxy.None` | Type of proxy in use (if any); for example "twemproxy" | +| proxy={proxy type} | `Proxy` | `Proxy.None` | Type of proxy in use (if any); for example "twemproxy/envoyproxy" | | resolveDns={bool} | `ResolveDns` | `false` | Specifies that DNS resolution should be explicit and eager, rather than implicit | | serviceName={string} | `ServiceName` | `null` | Used for connecting to a sentinel master service | | ssl={bool} | `Ssl` | `false` | Specifies that SSL encryption should be used | @@ -178,6 +178,15 @@ var options = new ConfigurationOptions }; ``` +[EnvoyProxy](https://github.com/envoyproxy/envoy) is a tool that allows to front a redis cluster with a set of proxies, with inbuilt discovery and fault tolerance. The feature-set available to Envoyproxy is reduced. To avoid having to configure this manually, the `Proxy` option can be used: + +```csharp +var options = new ConfigurationOptions+{ + EndPoints = { "my-proxy1", "my-proxy2", "my-proxy3" }, + Proxy = Proxy.EnvoyProxy +}; +``` + Tiebreakers and Configuration Change Announcements --- diff --git a/src/StackExchange.Redis/CommandMap.cs b/src/StackExchange.Redis/CommandMap.cs index ae0ea8eaf..0a7959bb1 100644 --- a/src/StackExchange.Redis/CommandMap.cs +++ b/src/StackExchange.Redis/CommandMap.cs @@ -53,6 +53,38 @@ internal CommandMap(CommandBytes[] map) RedisCommand.SAVE, RedisCommand.SHUTDOWN, RedisCommand.SLAVEOF, RedisCommand.SLOWLOG, RedisCommand.SYNC, RedisCommand.TIME }); + /// + /// The commands available to envoyproxy via + /// + public static CommandMap Envoyproxy { get; } = CreateImpl(null, exclusions: new HashSet + { + // not in supported_commands.h + RedisCommand.KEYS, RedisCommand.MIGRATE, RedisCommand.MOVE, RedisCommand.OBJECT, RedisCommand.RANDOMKEY, + RedisCommand.RENAME, RedisCommand.RENAMENX, RedisCommand.SORT, RedisCommand.SCAN, + + RedisCommand.BITOP, RedisCommand.MSET, RedisCommand.MSETNX, + + RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither! + + RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, + + RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH, + + RedisCommand.SCRIPT, + + RedisCommand.ECHO, RedisCommand.PING, RedisCommand.QUIT, RedisCommand.SELECT, + + RedisCommand.BGREWRITEAOF, RedisCommand.BGSAVE, RedisCommand.CLIENT, RedisCommand.CLUSTER, RedisCommand.CONFIG, RedisCommand.DBSIZE, + RedisCommand.DEBUG, RedisCommand.FLUSHALL, RedisCommand.FLUSHDB, RedisCommand.INFO, RedisCommand.LASTSAVE, RedisCommand.MONITOR, RedisCommand.REPLICAOF, + RedisCommand.SAVE, RedisCommand.SHUTDOWN, RedisCommand.SLAVEOF, RedisCommand.SLOWLOG, RedisCommand.SYNC, RedisCommand.TIME, + + // supported by envoy but not enabled by stack exchange + // RedisCommand.BITFIELD, + // + // RedisCommand.GEORADIUS_RO, + // RedisCommand.GEORADIUSBYMEMBER_RO, + }); + /// /// The commands available to https://ssdb.io/ /// @@ -173,7 +205,7 @@ internal void AssertAvailable(RedisCommand command) internal CommandBytes GetBytes(string command) { if (command == null) return default; - if(Enum.TryParse(command, true, out RedisCommand cmd)) + if (Enum.TryParse(command, true, out RedisCommand cmd)) { // we know that one! return map[(int)cmd]; } diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index 138096120..2ee26ac89 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -273,6 +273,7 @@ public CommandMap CommandMap get => commandMap ?? Proxy switch { Proxy.Twemproxy => CommandMap.Twemproxy, + Proxy.Envoyproxy => CommandMap.Envoyproxy, _ => CommandMap.Default, }; set => commandMap = value ?? throw new ArgumentNullException(nameof(value)); diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index 13eebbee3..8392fdb20 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -1416,7 +1416,14 @@ internal long LastHeartbeatSecondsAgo /// The async state object to pass to the created . public ISubscriber GetSubscriber(object asyncState = null) { - if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The pub/sub API is not available via twemproxy"); + switch (RawConfig.Proxy) + { + case Proxy.Twemproxy: + case Proxy.Envoyproxy: + throw new NotSupportedException($"The pub/sub API is not available via {RawConfig.Proxy}"); + case Proxy.None: + break; + } return new RedisSubscriber(this, asyncState); } @@ -1432,9 +1439,10 @@ internal int ApplyDefaultDatabase(int db) throw new ArgumentOutOfRangeException(nameof(db)); } - if (db != 0 && RawConfig.Proxy == Proxy.Twemproxy) + if (db != 0 && + (RawConfig.Proxy == Proxy.Twemproxy || RawConfig.Proxy == Proxy.Envoyproxy)) { - throw new NotSupportedException("Twemproxy only supports database 0"); + throw new NotSupportedException($"{RawConfig.Proxy} only supports database 0"); } return db; @@ -1502,7 +1510,10 @@ public IDatabase GetDatabase(int db = -1, object asyncState = null) public IServer GetServer(EndPoint endpoint, object asyncState = null) { if (endpoint == null) throw new ArgumentNullException(nameof(endpoint)); - if (RawConfig.Proxy == Proxy.Twemproxy) throw new NotSupportedException("The server API is not available via twemproxy"); + if (RawConfig.Proxy == Proxy.Twemproxy || RawConfig.Proxy == Proxy.Envoyproxy) + { + throw new NotSupportedException($"The server API is not available via {RawConfig.Proxy}"); + } var server = (ServerEndPoint)servers[endpoint]; if (server == null) throw new ArgumentException("The specified endpoint is not defined", nameof(endpoint)); return new RedisServer(this, server, asyncState); @@ -1818,6 +1829,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP switch (server.ServerType) { case ServerType.Twemproxy: + case ServerType.Envoyproxy: case ServerType.Sentinel: case ServerType.Standalone: case ServerType.Cluster: @@ -1862,31 +1874,44 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP if (clusterCount == 0) { // set the serverSelectionStrategy - if (RawConfig.Proxy == Proxy.Twemproxy) - { - ServerSelectionStrategy.ServerType = ServerType.Twemproxy; - } - else if (standaloneCount == 0 && sentinelCount > 0) + switch (RawConfig.Proxy) { - ServerSelectionStrategy.ServerType = ServerType.Sentinel; - } - else if (standaloneCount > 0) - { - ServerSelectionStrategy.ServerType = ServerType.Standalone; + case Proxy.Twemproxy: + ServerSelectionStrategy.ServerType = ServerType.Twemproxy; + break; + case Proxy.Envoyproxy: + ServerSelectionStrategy.ServerType = ServerType.Envoyproxy; + break; + default: + if (standaloneCount == 0 && sentinelCount > 0) + { + ServerSelectionStrategy.ServerType = ServerType.Sentinel; + } + else + { + ServerSelectionStrategy.ServerType = ServerType.Standalone; + } + break; } - var preferred = NominatePreferredMaster(log, servers, useTieBreakers, masters); - foreach (var master in masters) + // if multiple masters are detected nominate preferrered master + // does not apply to envoyproxy, envoyproxy will have multiple "masters" and we would + // we want to keep the default round robin behavior + if (ServerSelectionStrategy.ServerType != ServerType.Envoyproxy) { - if (master == preferred || master.IsReplica) + var preferred = NominatePreferredMaster(log, servers, useTieBreakers, masters); + foreach (var master in masters) { - log?.WriteLine($"{Format.ToString(master)}: Clearing as RedundantMaster"); - master.ClearUnselectable(UnselectableFlags.RedundantMaster); - } - else - { - log?.WriteLine($"{Format.ToString(master)}: Setting as RedundantMaster"); - master.SetUnselectable(UnselectableFlags.RedundantMaster); + if (master == preferred || master.IsReplica) + { + log?.WriteLine($"{Format.ToString(master)}: Clearing as RedundantMaster"); + master.ClearUnselectable(UnselectableFlags.RedundantMaster); + } + else + { + log?.WriteLine($"{Format.ToString(master)}: Setting as RedundantMaster"); + master.SetUnselectable(UnselectableFlags.RedundantMaster); + } } } } diff --git a/src/StackExchange.Redis/Enums/Proxy.cs b/src/StackExchange.Redis/Enums/Proxy.cs index ed87ba495..9cc086dcf 100644 --- a/src/StackExchange.Redis/Enums/Proxy.cs +++ b/src/StackExchange.Redis/Enums/Proxy.cs @@ -12,6 +12,10 @@ public enum Proxy /// /// Communication via twemproxy /// - Twemproxy + Twemproxy, + /// + /// Communication via envoyproxy + /// + Envoyproxy } } diff --git a/src/StackExchange.Redis/Enums/ServerType.cs b/src/StackExchange.Redis/Enums/ServerType.cs index 80072f34a..4f344807f 100644 --- a/src/StackExchange.Redis/Enums/ServerType.cs +++ b/src/StackExchange.Redis/Enums/ServerType.cs @@ -20,6 +20,10 @@ public enum ServerType /// /// Distributed redis installation via twemproxy /// - Twemproxy + Twemproxy, + /// + /// Redis cluster via envoyproxy + /// + Envoyproxy } } diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 2924368a4..085b19987 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -573,7 +573,7 @@ internal Message GetSelectDatabaseCommand(int targetDatabase, Message message) } int available = serverEndpoint.Databases; - if (!serverEndpoint.HasDatabases) // only db0 is available on cluster/twemproxy + if (!serverEndpoint.HasDatabases) // only db0 is available on cluster/twemproxy/envoyproxy { if (targetDatabase != 0) { // should never see this, since the API doesn't allow it; thus not too worried about ExceptionFactory diff --git a/src/StackExchange.Redis/RedisBase.cs b/src/StackExchange.Redis/RedisBase.cs index 9d42d05a3..cff4d7f9d 100644 --- a/src/StackExchange.Redis/RedisBase.cs +++ b/src/StackExchange.Redis/RedisBase.cs @@ -109,7 +109,7 @@ private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlag if (map.IsAvailable(RedisCommand.ECHO)) return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.ECHO, RedisLiterals.PING); // as our fallback, we'll do something odd... we'll treat a key like a value, out of sheer desperation - // note: this usually means: twemproxy - in which case we're fine anyway, since the proxy does the routing + // note: this usually means: twemproxy/envoyproxy - in which case we're fine anyway, since the proxy does the routing return ResultProcessor.TimingProcessor.CreateMessage(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId); } diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index 18f68f36b..c3e7bb714 100755 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -56,11 +56,19 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60; serverType = ServerType.Standalone; ConfigCheckSeconds = Multiplexer.RawConfig.ConfigCheckSeconds; - // overrides for twemproxy - if (multiplexer.RawConfig.Proxy == Proxy.Twemproxy) + // overrides for twemproxy etc + switch (multiplexer.RawConfig.Proxy) { - databases = 1; - serverType = ServerType.Twemproxy; + case Proxy.Twemproxy: + databases = 1; + serverType = ServerType.Twemproxy; + break; + case Proxy.Envoyproxy: + databases = 1; + serverType = ServerType.Envoyproxy; + break; + case Proxy.None: + break; } } @@ -324,7 +332,7 @@ internal void AddScript(string script, byte[] hash) internal async Task AutoConfigureAsync(PhysicalConnection connection, LogProxy log = null) { - if (serverType == ServerType.Twemproxy) + if (serverType == ServerType.Twemproxy || serverType == ServerType.Envoyproxy) { // don't try to detect configuration; all the config commands are disabled, and // the fallback master/replica detection won't help diff --git a/src/StackExchange.Redis/ServerSelectionStrategy.cs b/src/StackExchange.Redis/ServerSelectionStrategy.cs index 7578936f2..0d5277e03 100644 --- a/src/StackExchange.Redis/ServerSelectionStrategy.cs +++ b/src/StackExchange.Redis/ServerSelectionStrategy.cs @@ -107,9 +107,10 @@ public ServerEndPoint Select(Message message) switch (ServerType) { case ServerType.Cluster: - case ServerType.Twemproxy: // strictly speaking twemproxy uses a different hashing algo, but the hash-tag behavior is - // the same, so this does a pretty good job of spotting illegal commands before sending them - + // strictly speaking envproxy/twemproxy uses a different hashing algo, but the hash-tag behavior is + // the same, so this does a pretty good job of spotting illegal commands before sending them + case ServerType.Envoyproxy: + case ServerType.Twemproxy: slot = message.GetHashSlot(this); if (slot == MultipleSlots) throw ExceptionFactory.MultiSlot(multiplexer.IncludeDetailInExceptions, message); break; From 03121601bca7e3871459dc14e171ec25afc772c2 Mon Sep 17 00:00:00 2001 From: Karthick Ramachandran Date: Sun, 6 Feb 2022 13:46:04 -0800 Subject: [PATCH 2/3] added test for envoy proxy --- .../EnvoyproxyTests.cs | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 tests/StackExchange.Redis.Tests/EnvoyproxyTests.cs diff --git a/tests/StackExchange.Redis.Tests/EnvoyproxyTests.cs b/tests/StackExchange.Redis.Tests/EnvoyproxyTests.cs new file mode 100644 index 000000000..56d36b681 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/EnvoyproxyTests.cs @@ -0,0 +1,36 @@ +using System; +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests +{ + public class EnvoyproxyTests : TestBase + { + public EnvoyproxyTests(ITestOutputHelper output) : base(output) { } + + protected override string GetConfiguration() => "127.0.0.1,proxy=EnvoyProxy"; + + [Fact(Skip = "No CI build for this yet")] + public void CanConnectToEnvoyProxy() + { + using (var conn = Create()) + { + var expected = Guid.NewGuid().ToString(); + var db = conn.GetDatabase(); + RedisKey key = Me(); + db.StringSet(key, expected); + var actual = (string)db.StringGet(key); + Assert.Equal(expected, actual); + + // check it knows that we're dealing with envoy proxy + var server = conn.GetServer(conn.GetEndPoints()[0]); + Assert.Equal(ServerType.Envoyproxy, server.ServerType); ; + _ = server.Echo("abc"); + + var ex = Assert.Throws(() => conn.GetSubscriber("abc")); + Assert.Equal("The pub/sub API is not available via EnvoyProxy", ex.Message); + } + } + + } +} From 9e783f815c59c2d46b287093e013b848cfdb83de Mon Sep 17 00:00:00 2001 From: Karthick Ramachandran Date: Mon, 7 Feb 2022 20:06:41 -0800 Subject: [PATCH 3/3] Revert "Merge branch 'StackExchange:main' into envoy-support" This reverts commit 44d795e15ccb36353e3b97cff79c97607f8ce88a, reversing changes made to 03121601bca7e3871459dc14e171ec25afc772c2. --- .github/workflows/CI.yml | 28 +- appveyor.yml | 2 + src/StackExchange.Redis/BacklogPolicy.cs | 43 -- .../ConfigurationOptions.cs | 12 - .../ConnectionMultiplexer.cs | 14 +- src/StackExchange.Redis/Enums/CommandFlags.cs | 2 +- src/StackExchange.Redis/Message.cs | 37 +- src/StackExchange.Redis/PhysicalBridge.cs | 360 ++++++---------- src/StackExchange.Redis/PhysicalConnection.cs | 8 +- src/StackExchange.Redis/RedisServer.cs | 25 +- src/StackExchange.Redis/ServerEndPoint.cs | 14 +- .../ServerSelectionStrategy.cs | 30 +- tests/StackExchange.Redis.Tests/AsyncTests.cs | 2 +- .../StackExchange.Redis.Tests/BacklogTests.cs | 402 ------------------ .../ConnectFailTimeout.cs | 2 +- .../ConnectingFailDetection.cs | 3 +- .../ConnectionFailedErrors.cs | 1 - .../ExceptionFactoryTests.cs | 3 +- tests/StackExchange.Redis.Tests/PubSub.cs | 7 +- .../PubSubMultiserver.cs | 2 +- tests/StackExchange.Redis.Tests/Secure.cs | 1 - .../SharedConnectionFixture.cs | 16 +- .../StackExchange.Redis.Tests.csproj | 2 +- tests/StackExchange.Redis.Tests/TestBase.cs | 7 +- 24 files changed, 211 insertions(+), 812 deletions(-) delete mode 100644 src/StackExchange.Redis/BacklogPolicy.cs delete mode 100644 tests/StackExchange.Redis.Tests/BacklogTests.cs diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 44bd09eb6..6d2c0e8d6 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -1,4 +1,4 @@ -name: CI +name: CI Builds on: pull_request: @@ -16,10 +16,11 @@ jobs: steps: - name: Checkout code uses: actions/checkout@v1 - - name: Install .NET SDK + - name: Setup .NET Core uses: actions/setup-dotnet@v1 with: dotnet-version: | + 3.1.x 6.0.x - name: .NET Build run: dotnet build Build.csproj -c Release /p:CI=true @@ -32,25 +33,24 @@ jobs: continue-on-error: true if: success() || failure() with: - name: Test Results - Ubuntu + name: StackExchange.Redis.Tests (Ubuntu) - Results path: 'test-results/*.trx' reporter: dotnet-trx - name: .NET Lib Pack run: dotnet pack src/StackExchange.Redis/StackExchange.Redis.csproj --no-build -c Release /p:Packing=true /p:PackageOutputPath=%CD%\.nupkgs /p:CI=true windows: - name: StackExchange.Redis (Windows Server 2022) - runs-on: windows-2022 - env: - NUGET_CERT_REVOCATION_MODE: offline # Disabling signing because of massive perf hit, see https://github.com/NuGet/Home/issues/11548 + name: StackExchange.Redis (Windows Server 2019) + runs-on: windows-2019 steps: - name: Checkout code uses: actions/checkout@v1 - # - name: Install .NET SDK - # uses: actions/setup-dotnet@v1 - # with: - # dotnet-version: | - # 6.0.x + - name: Setup .NET Core 3.x + uses: actions/setup-dotnet@v1 + with: + dotnet-version: | + 3.1.x + 6.0.x - name: .NET Build run: dotnet build Build.csproj -c Release /p:CI=true - name: Start Redis Services (v3.0.503) @@ -79,6 +79,6 @@ jobs: continue-on-error: true if: success() || failure() with: - name: Tests Results - Windows Server 2022 + name: StackExchange.Redis.Tests (Windows Server 2019) - Results path: 'test-results/*.trx' - reporter: dotnet-trx + reporter: dotnet-trx diff --git a/appveyor.yml b/appveyor.yml index a2107f48c..7387352eb 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -6,6 +6,8 @@ init: install: - cmd: >- + choco install dotnet-sdk --version 5.0.404 + choco install dotnet-sdk --version 6.0.101 cd tests\RedisConfigs\3.0.503 diff --git a/src/StackExchange.Redis/BacklogPolicy.cs b/src/StackExchange.Redis/BacklogPolicy.cs deleted file mode 100644 index 4fb9e67c7..000000000 --- a/src/StackExchange.Redis/BacklogPolicy.cs +++ /dev/null @@ -1,43 +0,0 @@ -namespace StackExchange.Redis -{ - /// - /// The backlog policy to use for commands. This policy comes into effect when a connection is unhealthy or unavailable. - /// The policy can choose to backlog commands and wait to try them (within their timeout) against a connection when it comes up, - /// or it could choose to fail fast and throw ASAP. Different apps desire different behaviors with backpressure and how to handle - /// large amounts of load, so this is configurable to optimize the happy path but avoid spiral-of-death queue scenarios for others. - /// - public sealed class BacklogPolicy - { - /// - /// Backlog behavior matching StackExchange.Redis's 2.x line, failing fast and not attempting to queue - /// and retry when a connection is available again. - /// - public static BacklogPolicy FailFast { get; } = new() - { - QueueWhileDisconnected = false, - AbortPendingOnConnectionFailure = true, - }; - - /// - /// Default backlog policy which will allow commands to be issues against an endpoint and queue up. - /// Commands are still subject to their async timeout (which serves as a queue size check). - /// - public static BacklogPolicy Default { get; } = new() - { - QueueWhileDisconnected = true, - AbortPendingOnConnectionFailure = false, - }; - - /// - /// Whether to queue commands while disconnected. - /// True means queue for attempts up until their timeout. - /// False means to fail ASAP and queue nothing. - /// - public bool QueueWhileDisconnected { get; init; } - - /// - /// Whether to immediately abandon (with an exception) all pending commands when a connection goes unhealthy. - /// - public bool AbortPendingOnConnectionFailure { get; init; } - } -} diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index 1a60e5cd4..2ee26ac89 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -146,8 +146,6 @@ public static string TryNormalize(string value) private IReconnectRetryPolicy reconnectRetryPolicy; - private BacklogPolicy backlogPolicy; - /// /// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note /// that this cannot be specified in the configuration-string. @@ -375,15 +373,6 @@ public IReconnectRetryPolicy ReconnectRetryPolicy set => reconnectRetryPolicy = value; } - /// - /// The backlog policy to be used for commands when a connection is unhealthy. - /// - public BacklogPolicy BacklogPolicy - { - get => backlogPolicy ?? BacklogPolicy.Default; - set => backlogPolicy = value; - } - /// /// Indicates whether endpoints should be resolved via DNS before connecting. /// If enabled the ConnectionMultiplexer will not re-resolve DNS @@ -553,7 +542,6 @@ public ConfigurationOptions Clone() responseTimeout = responseTimeout, DefaultDatabase = DefaultDatabase, ReconnectRetryPolicy = reconnectRetryPolicy, - BacklogPolicy = backlogPolicy, SslProtocols = SslProtocols, checkCertificateRevocation = checkCertificateRevocation, }; diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index a3bd27a7a..8392fdb20 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -804,7 +804,7 @@ internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new) /// The key to get a hash slot ID for. public int HashSlot(RedisKey key) => ServerSelectionStrategy.HashSlot(key); - internal ServerEndPoint AnyServer(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags, bool allowDisconnected) + internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags) { var tmp = GetServerSnapshot(); int len = tmp.Length; @@ -812,7 +812,7 @@ internal ServerEndPoint AnyServer(ServerType serverType, uint startOffset, Redis for (int i = 0; i < len; i++) { var server = tmp[(int)(((uint)i + startOffset) % len)]; - if (server != null && server.ServerType == serverType && server.IsSelectable(command, allowDisconnected)) + if (server != null && server.ServerType == serverType && server.IsSelectable(command)) { if (server.IsReplica) { @@ -2194,12 +2194,6 @@ private bool PrepareToPushMessageToBridge(Message message, ResultProcessor { // Infer a server automatically server = SelectServer(message); - - // If we didn't find one successfully, and we're allowed, queue for any viable server - if (server == null && message != null && RawConfig.BacklogPolicy.QueueWhileDisconnected) - { - server = ServerSelectionStrategy.Select(message, allowDisconnected: true); - } } else // a server was specified; do we trust their choice, though? { @@ -2217,9 +2211,7 @@ private bool PrepareToPushMessageToBridge(Message message, ResultProcessor } break; } - - // If we're not allowed to queue while disconnected, we'll bomb out below. - if (!server.IsConnected && !RawConfig.BacklogPolicy.QueueWhileDisconnected) + if (!server.IsConnected) { // well, that's no use! server = null; diff --git a/src/StackExchange.Redis/Enums/CommandFlags.cs b/src/StackExchange.Redis/Enums/CommandFlags.cs index 119fe22bb..c1efc65c1 100644 --- a/src/StackExchange.Redis/Enums/CommandFlags.cs +++ b/src/StackExchange.Redis/Enums/CommandFlags.cs @@ -81,7 +81,7 @@ public enum CommandFlags /// NoScriptCache = 512, - // 1024: Removed - was used for async timeout checks; never user-specified, so not visible on the public API + // 1024: used for timed-out; never user-specified, so not visible on the public API // 2048: Use subscription connection type; never user-specified, so not visible on the public API } diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index 84bbcd6d0..708e4600f 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -1,10 +1,12 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Threading; +using System.Threading.Tasks; using StackExchange.Redis.Profiling; using static StackExchange.Redis.ConnectionMultiplexer; @@ -58,6 +60,7 @@ internal abstract class Message : ICompletable private const CommandFlags AskingFlag = (CommandFlags)32, ScriptUnavailableFlag = (CommandFlags)256, + NeedsAsyncTimeoutCheckFlag = (CommandFlags)1024, DemandSubscriptionConnection = (CommandFlags)2048; private const CommandFlags MaskMasterServerPreference = CommandFlags.DemandMaster @@ -642,7 +645,10 @@ internal void SetRequestSent() [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void SetWriteTime() { - _writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that + if ((Flags & NeedsAsyncTimeoutCheckFlag) != 0) + { + _writeTickCount = Environment.TickCount; // note this might be reset if we resend a message, cluster-moved etc; I'm OK with that + } } private int _writeTickCount; public int GetWriteTime() => Volatile.Read(ref _writeTickCount); @@ -656,17 +662,21 @@ internal void SetWriteTime() /// internal void SetForSubscriptionBridge() => Flags |= DemandSubscriptionConnection; - /// - /// Checks if this message has violated the provided timeout. - /// Whether it's a sync operation in a .Wait() or in the backlog queue or written/pending asynchronously, we need to timeout everything. - /// ...or we get indefinite Task hangs for completions. - /// - internal bool HasTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken) + private void SetNeedsTimeoutCheck() => Flags |= NeedsAsyncTimeoutCheckFlag; + internal bool HasAsyncTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken) { - millisecondsTaken = unchecked(now - _writeTickCount); // note: we can't just check "if sent < cutoff" because of wrap-around - if (millisecondsTaken >= timeoutMilliseconds) + if ((Flags & NeedsAsyncTimeoutCheckFlag) != 0) { - return true; + millisecondsTaken = unchecked(now - _writeTickCount); // note: we can't just check "if sent < cutoff" because of wrap-aro + if (millisecondsTaken >= timeoutMilliseconds) + { + Flags &= ~NeedsAsyncTimeoutCheckFlag; // note: we don't remove it from the queue - still might need to marry it up; but: it is toast + return true; + } + } + else + { + millisecondsTaken = default; } return false; } @@ -685,17 +695,16 @@ internal void SetPreferMaster() => internal void SetPreferReplica() => Flags = (Flags & ~MaskMasterServerPreference) | CommandFlags.PreferReplica; - /// - /// Note order here reversed to prevent overload resolution errors - /// internal void SetSource(ResultProcessor resultProcessor, IResultBox resultBox) - { + { // note order here reversed to prevent overload resolution errors + if (resultBox != null && resultBox.IsAsync) SetNeedsTimeoutCheck(); this.resultBox = resultBox; this.resultProcessor = resultProcessor; } internal void SetSource(IResultBox resultBox, ResultProcessor resultProcessor) { + if (resultBox != null && resultBox.IsAsync) SetNeedsTimeoutCheck(); this.resultBox = resultBox; this.resultProcessor = resultProcessor; } diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index 025702fa7..1a88cc8ea 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -13,6 +13,7 @@ using static Pipelines.Sockets.Unofficial.Threading.MutexSlim; #endif + namespace StackExchange.Redis { internal sealed class PhysicalBridge : IDisposable @@ -27,20 +28,8 @@ internal sealed class PhysicalBridge : IDisposable private readonly long[] profileLog = new long[ProfileLogSamples]; - /// - /// We have 1 queue in play on this bridge. - /// We're bypassing the queue for handshake events that go straight to the socket. - /// Everything else that's not an internal call goes into the queue if there is a queue. - /// - /// In a later release we want to remove per-server events from this queue completely and shunt queued messages - /// to another capable primary connection if one is available to process them faster (order is already hosed). - /// For now, simplicity in: queue it all, replay or timeout it all. - /// - private readonly ConcurrentQueue _backlog = new(); - private bool BacklogHasItems => !_backlog.IsEmpty; + private readonly ConcurrentQueue _backlog = new ConcurrentQueue(); private int _backlogProcessorIsRunning = 0; - private int _backlogCurrentEnqueued = 0; - private long _backlogTotalEnqueued = 0; private int activeWriters = 0; private int beating; @@ -143,25 +132,22 @@ public void ReportNextFailure() private WriteResult QueueOrFailMessage(Message message) { - // If it's an internal call that's not a QUIT - // or we're allowed to queue in general, then queue - if (message.IsInternalCall || Multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) + if (message.IsInternalCall && message.Command != RedisCommand.QUIT) { - // Let's just never ever queue a QUIT message - if (message.Command != RedisCommand.QUIT) - { - message.SetEnqueued(null); - BacklogEnqueue(message, null); - // Note: we don't start a worker on each message here - return WriteResult.Success; // Successfully queued, so indicate success - } + // you can go in the queue, but we won't be starting + // a worker, because the handshake has not completed + message.SetEnqueued(null); + _backlog.Enqueue(message); + return WriteResult.Success; // we'll take it... + } + else + { + // sorry, we're just not ready for you yet; + message.Cancel(); + Multiplexer?.OnMessageFaulted(message, null); + message.Complete(); + return WriteResult.NoConnectionAvailable; } - - // Anything else goes in the bin - we're just not ready for you yet - message.Cancel(); - Multiplexer?.OnMessageFaulted(message, null); - message.Complete(); - return WriteResult.NoConnectionAvailable; } private WriteResult FailDueToNoConnection(Message message) @@ -179,45 +165,21 @@ public WriteResult TryWriteSync(Message message, bool isReplica) if (!IsConnected) return QueueOrFailMessage(message); var physical = this.physical; - if (physical == null) - { - // If we're not connected yet and supposed to, queue it up - if (Multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) - { - if (TryPushToBacklog(message, onlyIfExists: false)) - { - message.SetEnqueued(null); - return WriteResult.Success; - } - } - return FailDueToNoConnection(message); - } + if (physical == null) return FailDueToNoConnection(message); var result = WriteMessageTakingWriteLockSync(physical, message); LogNonPreferred(message.Flags, isReplica); return result; } - public ValueTask TryWriteAsync(Message message, bool isReplica, bool bypassBacklog = false) + public ValueTask TryWriteAsync(Message message, bool isReplica) { if (isDisposed) throw new ObjectDisposedException(Name); - if (!IsConnected && !bypassBacklog) return new ValueTask(QueueOrFailMessage(message)); + if (!IsConnected) return new ValueTask(QueueOrFailMessage(message)); var physical = this.physical; - if (physical == null) - { - // If we're not connected yet and supposed to, queue it up - if (!bypassBacklog && Multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) - { - if (TryPushToBacklog(message, onlyIfExists: false)) - { - message.SetEnqueued(null); - return new ValueTask(WriteResult.Success); - } - } - return new ValueTask(FailDueToNoConnection(message)); - } + if (physical == null) return new ValueTask(FailDueToNoConnection(message)); - var result = WriteMessageTakingWriteLockAsync(physical, message, bypassBacklog: bypassBacklog); + var result = WriteMessageTakingWriteLockAsync(physical, message); LogNonPreferred(message.Flags, isReplica); return result; } @@ -268,22 +230,13 @@ internal readonly struct BridgeStatus public bool IsWriterActive { get; init; } /// - /// Status of the currently processing backlog, if any. - /// - public BacklogStatus BacklogStatus { get; init; } - - /// - /// The number of messages that are in the backlog queue (waiting to be sent when the connection is healthy again). + /// Total number of backlog messages that are in the retry backlog. /// public int BacklogMessagesPending { get; init; } /// - /// The number of messages that are in the backlog queue (waiting to be sent when the connection is healthy again). - /// - public int BacklogMessagesPendingCounter { get; init; } - /// - /// The number of messages ever added to the backlog queue in the life of this connection. + /// Status of the currently processing backlog, if any. /// - public long TotalBacklogMessagesQueued { get; init; } + public BacklogStatus BacklogStatus { get; init; } /// /// Status for the underlying . @@ -294,9 +247,6 @@ internal readonly struct BridgeStatus /// The default bridge stats, notable *not* the same as default since initializers don't run. /// public static BridgeStatus Zero { get; } = new() { Connection = PhysicalConnection.ConnectionStatus.Zero }; - - public override string ToString() => - $"MessagesSinceLastHeartbeat: {MessagesSinceLastHeartbeat}, Writer: {(IsWriterActive ? "Active" : "Inactive")}, BacklogStatus: {BacklogStatus}, BacklogMessagesPending: (Queue: {BacklogMessagesPending}, Counter: {BacklogMessagesPendingCounter}), TotalBacklogMessagesQueued: {TotalBacklogMessagesQueued}, Connection: ({Connection})"; } internal BridgeStatus GetStatus() => new() @@ -308,9 +258,7 @@ public override string ToString() => IsWriterActive = !_singleWriterMutex.IsAvailable, #endif BacklogMessagesPending = _backlog.Count, - BacklogMessagesPendingCounter = Volatile.Read(ref _backlogCurrentEnqueued), BacklogStatus = _backlogStatus, - TotalBacklogMessagesQueued = _backlogTotalEnqueued, Connection = physical?.GetStatus() ?? PhysicalConnection.ConnectionStatus.Default, }; @@ -409,12 +357,7 @@ internal void ResetNonConnected() internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailureType failureType, Exception innerException) { Trace($"OnConnectionFailed: {connection}"); - // If we're configured to, fail all pending backlogged messages - if (Multiplexer.RawConfig.BacklogPolicy?.AbortPendingOnConnectionFailure == true) - { - AbandonPendingBacklog(innerException); - } - + AbandonPendingBacklog(innerException); if (reportNextFailure) { LastException = innerException; @@ -462,7 +405,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti private void AbandonPendingBacklog(Exception ex) { - while (BacklogTryDequeue(out Message next)) + while (_backlog.TryDequeue(out Message next)) { Multiplexer?.OnMessageFaulted(next, ex); next.SetExceptionAndComplete(ex, this); @@ -481,10 +424,8 @@ internal void OnFullyEstablished(PhysicalConnection connection, string source) ServerEndPoint.OnFullyEstablished(connection, source); // do we have pending system things to do? - if (BacklogHasItems) - { - StartBacklogProcessor(); - } + bool createWorker = !_backlog.IsEmpty; + if (createWorker) StartBacklogProcessor(); if (ConnectionType == ConnectionType.Interactive) ServerEndPoint.CheckInfoReplication(); } @@ -502,14 +443,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) bool runThisTime = false; try { - if (BacklogHasItems) - { - // If we have a backlog, kickoff the processing - // This will first timeout any messages that have sat too long and either: - // A: Abort if we're still not connected yet (we should be in this path) - // or B: Process the backlog and send those messages through the pipe - StartBacklogProcessor(); - } + CheckBacklogForTimeouts(); runThisTime = !isDisposed && Interlocked.CompareExchange(ref beating, 1, 0) == 0; if (!runThisTime) return; @@ -604,11 +538,16 @@ internal void OnHeartbeat(bool ifConnectedOnly) } } - internal void RemovePhysical(PhysicalConnection connection) => + internal void RemovePhysical(PhysicalConnection connection) + { Interlocked.CompareExchange(ref physical, null, connection); + } [Conditional("VERBOSE")] - internal void Trace(string message) => Multiplexer.Trace(message, ToString()); + internal void Trace(string message) + { + Multiplexer.Trace(message, ToString()); + } [Conditional("VERBOSE")] internal void Trace(bool condition, string message) @@ -696,8 +635,8 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical // AVOID REORDERING MESSAGES // Prefer to add it to the backlog if this thread can see that there might already be a message backlog. - // We do this before attempting to take the write lock, because we won't actually write, we'll just let the backlog get processed in due course - if (TryPushToBacklog(message, onlyIfExists: true)) + // We do this before attempting to take the writelock, because we won't actually write, we'll just let the backlog get processed in due course + if (PushToBacklog(message, onlyIfExists: true)) { return WriteResult.Success; // queued counts as success } @@ -717,11 +656,9 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical if (!token.Success) #endif { - // If we can't get it *instantaneously*, pass it to the backlog for throughput - if (TryPushToBacklog(message, onlyIfExists: false)) - { - return WriteResult.Success; // queued counts as success - } + // we can't get it *instantaneously*; is there + // perhaps a backlog and active backlog processor? + if (PushToBacklog(message, onlyIfExists: true)) return WriteResult.Success; // queued counts as success // no backlog... try to wait with the timeout; // if we *still* can't get it: that counts as @@ -761,29 +698,17 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool TryPushToBacklog(Message message, bool onlyIfExists, bool bypassBacklog = false) + private bool PushToBacklog(Message message, bool onlyIfExists) { - // In the handshake case: send the command directly through. - // If we're disconnected *in the middle of a handshake*, we've bombed a brand new socket and failing, - // backing off, and retrying next heartbeat is best anyway. - // - // Internal calls also shouldn't queue - try immediately. If these aren't errors (most aren't), we - // won't alert the user. - if (bypassBacklog || message.IsInternalCall) - { - return false; - } - - // Note, for deciding emptiness for whether to push onlyIfExists, and start worker, + // Note, for deciding emptyness for whether to push onlyIfExists, and start worker, // we only need care if WE are able to // see the queue when its empty. Not whether anyone else sees it as empty. // So strong synchronization is not required. - if (onlyIfExists && Volatile.Read(ref _backlogCurrentEnqueued) == 0) - { - return false; - } + if (_backlog.IsEmpty & onlyIfExists) return false; - BacklogEnqueue(message, physical); + + int count = _backlog.Count; + _backlog.Enqueue(message); // The correct way to decide to start backlog process is not based on previously empty // but based on a) not empty now (we enqueued!) and b) no backlog processor already running. @@ -792,24 +717,6 @@ private bool TryPushToBacklog(Message message, bool onlyIfExists, bool bypassBac return true; } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void BacklogEnqueue(Message message, PhysicalConnection physical) - { - _backlog.Enqueue(message); - Interlocked.Increment(ref _backlogTotalEnqueued); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool BacklogTryDequeue(out Message message) - { - if (_backlog.TryDequeue(out message)) - { - Interlocked.Decrement(ref _backlogCurrentEnqueued); - return true; - } - return false; - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] private void StartBacklogProcessor() { @@ -851,17 +758,17 @@ private void CheckBacklogForTimeouts() // But we reduce contention by only locking if we see something that looks timed out. while (_backlog.TryPeek(out Message message)) { - // See if the message has pass our async timeout threshold - // or has otherwise been completed (e.g. a sync wait timed out) which would have cleared the ResultBox - if (!message.HasTimedOut(now, timeout, out var _) || message.ResultBox == null) break; // not a timeout - we can stop looking + if (message.IsInternalCall) break; // don't stomp these (not that they should have the async timeout flag, but...) + if (!message.HasAsyncTimedOut(now, timeout, out var _)) break; // not a timeout - we can stop looking lock (_backlog) { - // Peek again since we didn't have lock before... + // peek again since we didn't have lock before... // and rerun the exact same checks as above, note that it may be a different message now if (!_backlog.TryPeek(out message)) break; - if (!message.HasTimedOut(now, timeout, out var _) && message.ResultBox != null) break; + if (message.IsInternalCall) break; + if (!message.HasAsyncTimedOut(now, timeout, out var _)) break; - if (!BacklogTryDequeue(out var message2) || (message != message2)) // consume it for real + if (!_backlog.TryDequeue(out var message2) || (message != message2)) // consume it for real { throw new RedisException("Thread safety bug detected! A queue message disappeared while we had the backlog lock"); } @@ -882,7 +789,6 @@ internal enum BacklogStatus : byte Started, CheckingForWork, CheckingForTimeout, - CheckingForTimeoutComplete, RecordingTimeout, WritingMessage, Flushing, @@ -894,53 +800,8 @@ internal enum BacklogStatus : byte } private volatile BacklogStatus _backlogStatus; - /// - /// Process the backlog(s) in play if any. - /// This means flushing commands to an available/active connection (if any) or spinning until timeout if not. - /// private async Task ProcessBacklogAsync() { - _backlogStatus = BacklogStatus.Starting; - try - { - if (!_backlog.IsEmpty) - { - // TODO: vNext handoff this backlog to another primary ("can handle everything") connection - // and remove any per-server commands. This means we need to track a bit of whether something - // was server-endpoint-specific in PrepareToPushMessageToBridge (was the server ref null or not) - await ProcessBridgeBacklogAsync(); // Needs handoff - } - } - catch - { - _backlogStatus = BacklogStatus.Faulted; - } - finally - { - // Do this in finally block, so that thread aborts can't convince us the backlog processor is running forever - if (Interlocked.CompareExchange(ref _backlogProcessorIsRunning, 0, 1) != 1) - { - throw new RedisException("Bug detection, couldn't indicate shutdown of backlog processor"); - } - - // Now that nobody is processing the backlog, we should consider starting a new backlog processor - // in case a new message came in after we ended this loop. - if (BacklogHasItems) - { - // Check for faults mainly to prevent unlimited tasks spawning in a fault scenario - // This won't cause a StackOverflowException due to the Task.Run() handoff - if (_backlogStatus != BacklogStatus.Faulted) - { - StartBacklogProcessor(); - } - } - } - } - - private async Task ProcessBridgeBacklogAsync() - { - // Importantly: don't assume we have a physical connection here - // We are very likely to hit a state where it's not re-established or even referenced here #if NETCOREAPP bool gotLock = false; #else @@ -949,14 +810,7 @@ private async Task ProcessBridgeBacklogAsync() try { _backlogStatus = BacklogStatus.Starting; - - // First eliminate any messages that have timed out already. - _backlogStatus = BacklogStatus.CheckingForTimeout; - CheckBacklogForTimeouts(); - _backlogStatus = BacklogStatus.CheckingForTimeoutComplete; - - // For the rest of the backlog, if we're not connected there's no point - abort out - while (IsConnected) + while (true) { // check whether the backlog is empty *before* even trying to get the lock if (_backlog.IsEmpty) return; // nothing to do @@ -972,38 +826,46 @@ private async Task ProcessBridgeBacklogAsync() } _backlogStatus = BacklogStatus.Started; - // Only execute if we're connected. - // Timeouts are handled above, so we're exclusively into backlog items eligible to write at this point. - // If we can't write them, abort and wait for the next heartbeat or activation to try this again. - while (IsConnected && physical?.HasOutputPipe == true) + // so now we are the writer; write some things! + Message message; + var timeout = TimeoutMilliseconds; + while(true) { - Message message; _backlogStatus = BacklogStatus.CheckingForWork; - + // We need to lock _backlog when dequeueing because of + // races with timeout processing logic lock (_backlog) { - // Note that we're actively taking it off the queue here, not peeking - // If there's nothing left in queue, we're done. - if (!BacklogTryDequeue(out message)) break; + if (!_backlog.TryDequeue(out message)) break; // all done } try { - _backlogStatus = BacklogStatus.WritingMessage; - var result = WriteMessageInsideLock(physical, message); - - if (result == WriteResult.Success) + _backlogStatus = BacklogStatus.CheckingForTimeout; + if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var _)) { - _backlogStatus = BacklogStatus.Flushing; - result = await physical.FlushAsync(false).ConfigureAwait(false); + _backlogStatus = BacklogStatus.RecordingTimeout; + var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint); + message.SetExceptionAndComplete(ex, this); } - - _backlogStatus = BacklogStatus.MarkingInactive; - if (result != WriteResult.Success) + else { - _backlogStatus = BacklogStatus.RecordingWriteFailure; - var ex = Multiplexer.GetException(result, message, ServerEndPoint); - HandleWriteException(message, ex); + _backlogStatus = BacklogStatus.WritingMessage; + var result = WriteMessageInsideLock(physical, message); + + if (result == WriteResult.Success) + { + _backlogStatus = BacklogStatus.Flushing; + result = await physical.FlushAsync(false).ConfigureAwait(false); + } + + _backlogStatus = BacklogStatus.MarkingInactive; + if (result != WriteResult.Success) + { + _backlogStatus = BacklogStatus.RecordingWriteFailure; + var ex = Multiplexer.GetException(result, message, ServerEndPoint); + HandleWriteException(message, ex); + } } } catch (Exception ex) @@ -1017,9 +879,13 @@ private async Task ProcessBridgeBacklogAsync() } } _backlogStatus = BacklogStatus.SettingIdle; - physical?.SetIdle(); + physical.SetIdle(); _backlogStatus = BacklogStatus.Inactive; } + catch + { + _backlogStatus = BacklogStatus.Faulted; + } finally { #if NETCOREAPP @@ -1030,6 +896,24 @@ private async Task ProcessBridgeBacklogAsync() #else token.Dispose(); #endif + + // Do this in finally block, so that thread aborts can't convince us the backlog processor is running forever + if (Interlocked.CompareExchange(ref _backlogProcessorIsRunning, 0, 1) != 1) + { + throw new RedisException("Bug detection, couldn't indicate shutdown of backlog processor"); + } + + // Now that nobody is processing the backlog, we should consider starting a new backlog processor + // in case a new message came in after we ended this loop. + if (!_backlog.IsEmpty) + { + // Check for faults mainly to prevent unlimited tasks spawning in a fault scenario + // - it isn't StackOverflowException due to the Task.Run() + if (_backlogStatus != BacklogStatus.Faulted) + { + StartBacklogProcessor(); + } + } } } @@ -1046,16 +930,27 @@ private WriteResult TimedOutBeforeWrite(Message message) /// /// The physical connection to write to. /// The message to be written. - /// Whether this message should bypass the backlog, going straight to the pipe or failing. - internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnection physical, Message message, bool bypassBacklog = false) + internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnection physical, Message message) { + /* design decision/choice; the code works fine either way, but if this is + * set to *true*, then when we can't take the writer-lock *right away*, + * we push the message to the backlog (starting a worker if needed) + * + * otherwise, we go for a TryWaitAsync and rely on the await machinery + * + * "true" seems to give faster times *when under heavy contention*, based on profiling + * but it involves the backlog concept; "false" works well under low contention, and + * makes more use of async + */ + const bool ALWAYS_USE_BACKLOG_IF_CANNOT_GET_SYNC_LOCK = true; + Trace("Writing: " + message); message.SetEnqueued(physical); // this also records the read/write stats at this point // AVOID REORDERING MESSAGES // Prefer to add it to the backlog if this thread can see that there might already be a message backlog. // We do this before attempting to take the writelock, because we won't actually write, we'll just let the backlog get processed in due course - if (TryPushToBacklog(message, onlyIfExists: true, bypassBacklog: bypassBacklog)) + if (PushToBacklog(message, onlyIfExists: true)) { return new ValueTask(WriteResult.Success); // queued counts as success } @@ -1070,20 +965,19 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect try { // try to acquire it synchronously + // note: timeout is specified in mutex-constructor #if NETCOREAPP gotLock = _singleWriterMutex.Wait(0); if (!gotLock) #else - // note: timeout is specified in mutex-constructor token = _singleWriterMutex.TryWait(options: WaitOptions.NoDelay); if (!token.Success) #endif { - // If we can't get it *instantaneously*, pass it to the backlog for throughput - if (TryPushToBacklog(message, onlyIfExists: false, bypassBacklog: bypassBacklog)) - { + // we can't get it *instantaneously*; is there + // perhaps a backlog and active backlog processor? + if (PushToBacklog(message, onlyIfExists: !ALWAYS_USE_BACKLOG_IF_CANNOT_GET_SYNC_LOCK)) return new ValueTask(WriteResult.Success); // queued counts as success - } // no backlog... try to wait with the timeout; // if we *still* can't get it: that counts as @@ -1102,10 +996,10 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect if (!token.Success) return new ValueTask(TimedOutBeforeWrite(message)); #endif } -#if DEBUG lockTaken = Environment.TickCount; -#endif + var result = WriteMessageInsideLock(physical, message); + if (result == WriteResult.Success) { var flush = physical.FlushAsync(false); @@ -1119,7 +1013,7 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect #endif } - result = flush.Result; // .Result: we know it was completed, so this is fine + result = flush.Result; // we know it was completed, this is fine } physical.SetIdle(); diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 782b03e56..085b19987 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -10,6 +10,7 @@ using System.Net.Security; using System.Net.Sockets; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Text; @@ -67,7 +68,6 @@ internal void GetBytes(out long sent, out long received) } private IDuplexPipe _ioPipe; - internal bool HasOutputPipe => _ioPipe?.Output != null; private Socket _socket; private Socket VolatileSocket => Volatile.Read(ref _socket); @@ -649,9 +649,7 @@ internal void OnBridgeHeartbeat() var timeout = bridge.Multiplexer.AsyncTimeoutMilliseconds; foreach (var msg in _writtenAwaitingResponse) { - // We only handle async timeouts here, synchronous timeouts are handled upstream. - // Those sync timeouts happen in ConnectionMultiplexer.ExecuteSyncImpl() via Monitor.Wait. - if (msg.ResultBoxIsAsync && msg.HasTimedOut(now, timeout, out var elapsed)) + if (msg.HasAsyncTimedOut(now, timeout, out var elapsed)) { bool haveDeltas = msg.TryGetPhysicalState(out _, out _, out long sentDelta, out var receivedDelta) && sentDelta >= 0 && receivedDelta >= 0; var timeoutEx = ExceptionFactory.Timeout(bridge.Multiplexer, haveDeltas @@ -1559,7 +1557,7 @@ private void OnDebugAbort() var bridge = BridgeCouldBeNull; if (bridge == null || !bridge.Multiplexer.AllowConnect) { - throw new RedisConnectionException(ConnectionFailureType.InternalFailure, "Aborting (AllowConnect: False)"); + throw new RedisConnectionException(ConnectionFailureType.InternalFailure, "debugging"); } } diff --git a/src/StackExchange.Redis/RedisServer.cs b/src/StackExchange.Redis/RedisServer.cs index fc60c48f6..05e5b7af6 100644 --- a/src/StackExchange.Redis/RedisServer.cs +++ b/src/StackExchange.Redis/RedisServer.cs @@ -572,8 +572,7 @@ internal static Message CreateReplicaOfMessage(ServerEndPoint sendMessageTo, End } internal override Task ExecuteAsync(Message message, ResultProcessor processor, ServerEndPoint server = null) - { - // inject our expected server automatically + { // inject our expected server automatically if (server == null) server = this.server; FixFlags(message, server); if (!server.IsConnected) @@ -581,32 +580,22 @@ internal override Task ExecuteAsync(Message message, ResultProcessor pr if (message == null) return CompletedTask.Default(asyncState); if (message.IsFireAndForget) return CompletedTask.Default(null); // F+F explicitly does not get async-state - // After the "don't care" cases above, if we can't queue then it's time to error - otherwise call through to queueing. - if (!multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) - { - // no need to deny exec-sync here; will be complete before they see if - var tcs = TaskSource.Create(asyncState); - ConnectionMultiplexer.ThrowFailed(tcs, ExceptionFactory.NoConnectionAvailable(multiplexer, message, server)); - return tcs.Task; - } + // no need to deny exec-sync here; will be complete before they see if + var tcs = TaskSource.Create(asyncState); + ConnectionMultiplexer.ThrowFailed(tcs, ExceptionFactory.NoConnectionAvailable(multiplexer, message, server)); + return tcs.Task; } return base.ExecuteAsync(message, processor, server); } internal override T ExecuteSync(Message message, ResultProcessor processor, ServerEndPoint server = null) - { - // inject our expected server automatically + { // inject our expected server automatically if (server == null) server = this.server; FixFlags(message, server); if (!server.IsConnected) { if (message == null || message.IsFireAndForget) return default(T); - - // After the "don't care" cases above, if we can't queue then it's time to error - otherwise call through to queueing. - if (!multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) - { - throw ExceptionFactory.NoConnectionAvailable(multiplexer, message, server); - } + throw ExceptionFactory.NoConnectionAvailable(multiplexer, message, server); } return base.ExecuteSync(message, processor, server); } diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index 46d08419c..c3e7bb714 100755 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -81,6 +81,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) public bool HasDatabases => serverType == ServerType.Standalone; public bool IsConnected => interactive?.IsConnected == true; + public bool IsSubscriberConnected => subscription?.IsConnected == true; public bool IsConnecting => interactive?.IsConnecting == true; @@ -149,7 +150,7 @@ internal PhysicalBridge.State ConnectionState get { var tmp = interactive; - return tmp?.ConnectionState ?? State.Disconnected; + return tmp.ConnectionState; } } @@ -560,11 +561,7 @@ internal Message GetTracerMessage(bool assertIdentity) internal bool IsSelectable(RedisCommand command, bool allowDisconnected = false) { - // Until we've connected at least once, we're going too have a DidNotRespond unselectable reason present - var bridge = unselectableReasons == 0 || (allowDisconnected && unselectableReasons == UnselectableFlags.DidNotRespond) - ? GetBridge(command, false) - : null; - + var bridge = unselectableReasons == 0 ? GetBridge(command, false) : null; return bridge != null && (allowDisconnected || bridge.IsConnected); } @@ -628,9 +625,6 @@ internal void OnFullyEstablished(PhysicalConnection connection, string source) var bridge = connection?.BridgeCouldBeNull; if (bridge != null) { - // Clear the unselectable flag ASAP since we are open for business - ClearUnselectable(UnselectableFlags.DidNotRespond); - if (bridge == subscription) { // Note: this MUST be fire and forget, because we might be in the middle of a Sync processing @@ -855,7 +849,7 @@ internal ValueTask WriteDirectOrQueueFireAndForgetAsync(PhysicalConnection co } else { - result = bridge.WriteMessageTakingWriteLockAsync(connection, message, bypassBacklog: true); + result = bridge.WriteMessageTakingWriteLockAsync(connection, message); } } diff --git a/src/StackExchange.Redis/ServerSelectionStrategy.cs b/src/StackExchange.Redis/ServerSelectionStrategy.cs index 8d06b69bb..0d5277e03 100644 --- a/src/StackExchange.Redis/ServerSelectionStrategy.cs +++ b/src/StackExchange.Redis/ServerSelectionStrategy.cs @@ -100,7 +100,7 @@ private static unsafe int GetClusterSlot(byte[] blob) } } - public ServerEndPoint Select(Message message, bool allowDisconnected = false) + public ServerEndPoint Select(Message message) { if (message == null) throw new ArgumentNullException(nameof(message)); int slot = NoSlot; @@ -115,19 +115,19 @@ public ServerEndPoint Select(Message message, bool allowDisconnected = false) if (slot == MultipleSlots) throw ExceptionFactory.MultiSlot(multiplexer.IncludeDetailInExceptions, message); break; } - return Select(slot, message.Command, message.Flags, allowDisconnected); + return Select(slot, message.Command, message.Flags); } - public ServerEndPoint Select(RedisCommand command, in RedisKey key, CommandFlags flags, bool allowDisconnected = false) + public ServerEndPoint Select(RedisCommand command, in RedisKey key, CommandFlags flags) { int slot = ServerType == ServerType.Cluster ? HashSlot(key) : NoSlot; - return Select(slot, command, flags, allowDisconnected); + return Select(slot, command, flags); } - public ServerEndPoint Select(RedisCommand command, in RedisChannel channel, CommandFlags flags, bool allowDisconnected = false) + public ServerEndPoint Select(RedisCommand command, in RedisChannel channel, CommandFlags flags) { int slot = ServerType == ServerType.Cluster ? HashSlot(channel) : NoSlot; - return Select(slot, command, flags, allowDisconnected); + return Select(slot, command, flags); } public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved) @@ -241,8 +241,10 @@ private static unsafe int IndexOf(byte* ptr, byte value, int start, int end) return -1; } - private ServerEndPoint Any(RedisCommand command, CommandFlags flags, bool allowDisconnected) => - multiplexer.AnyServer(ServerType, (uint)Interlocked.Increment(ref anyStartOffset), command, flags, allowDisconnected); + private ServerEndPoint Any(RedisCommand command, CommandFlags flags) + { + return multiplexer.AnyConnected(ServerType, (uint)Interlocked.Increment(ref anyStartOffset), command, flags); + } private static ServerEndPoint FindMaster(ServerEndPoint endpoint, RedisCommand command) { @@ -285,12 +287,12 @@ private ServerEndPoint[] MapForMutation() return arr; } - private ServerEndPoint Select(int slot, RedisCommand command, CommandFlags flags, bool allowDisconnected) + private ServerEndPoint Select(int slot, RedisCommand command, CommandFlags flags) { flags = Message.GetMasterReplicaFlags(flags); // only interested in master/replica preferences ServerEndPoint[] arr; - if (slot == NoSlot || (arr = map) == null) return Any(command, flags, allowDisconnected); + if (slot == NoSlot || (arr = map) == null) return Any(command, flags); ServerEndPoint endpoint = arr[slot], testing; // but: ^^^ is the MASTER slots; if we want a replica, we need to do some thinking @@ -300,21 +302,21 @@ private ServerEndPoint Select(int slot, RedisCommand command, CommandFlags flags switch (flags) { case CommandFlags.DemandReplica: - return FindReplica(endpoint, command) ?? Any(command, flags, allowDisconnected); + return FindReplica(endpoint, command) ?? Any(command, flags); case CommandFlags.PreferReplica: testing = FindReplica(endpoint, command); if (testing != null) return testing; break; case CommandFlags.DemandMaster: - return FindMaster(endpoint, command) ?? Any(command, flags, allowDisconnected); + return FindMaster(endpoint, command) ?? Any(command, flags); case CommandFlags.PreferMaster: testing = FindMaster(endpoint, command); if (testing != null) return testing; break; } - if (endpoint.IsSelectable(command, allowDisconnected)) return endpoint; + if (endpoint.IsSelectable(command)) return endpoint; } - return Any(command, flags, allowDisconnected); + return Any(command, flags); } } } diff --git a/tests/StackExchange.Redis.Tests/AsyncTests.cs b/tests/StackExchange.Redis.Tests/AsyncTests.cs index 1ea26e76e..4dd36670b 100644 --- a/tests/StackExchange.Redis.Tests/AsyncTests.cs +++ b/tests/StackExchange.Redis.Tests/AsyncTests.cs @@ -19,7 +19,7 @@ public void AsyncTasksReportFailureIfServerUnavailable() { SetExpectedAmbientFailureCount(-1); // this will get messy - using (var conn = Create(allowAdmin: true, shared: false, backlogPolicy: BacklogPolicy.FailFast)) + using (var conn = Create(allowAdmin: true, shared: false)) { var server = conn.GetServer(TestConfig.Current.MasterServer, TestConfig.Current.MasterPort); diff --git a/tests/StackExchange.Redis.Tests/BacklogTests.cs b/tests/StackExchange.Redis.Tests/BacklogTests.cs deleted file mode 100644 index 990d15d61..000000000 --- a/tests/StackExchange.Redis.Tests/BacklogTests.cs +++ /dev/null @@ -1,402 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Xunit; -using Xunit.Abstractions; - -namespace StackExchange.Redis.Tests -{ - public class BacklogTests : TestBase - { - public BacklogTests(ITestOutputHelper output) : base (output) { } - - protected override string GetConfiguration() => TestConfig.Current.MasterServerAndPort + "," + TestConfig.Current.ReplicaServerAndPort; - - [Fact] - public async Task FailFast() - { - void PrintSnapshot(ConnectionMultiplexer muxer) - { - Writer.WriteLine("Snapshot summary:"); - foreach (var server in muxer.GetServerSnapshot()) - { - Writer.WriteLine($" {server.EndPoint}: "); - Writer.WriteLine($" Type: {server.ServerType}"); - Writer.WriteLine($" IsConnected: {server.IsConnected}"); - Writer.WriteLine($" IsConnecting: {server.IsConnecting}"); - Writer.WriteLine($" IsSelectable(allowDisconnected: true): {server.IsSelectable(RedisCommand.PING, true)}"); - Writer.WriteLine($" IsSelectable(allowDisconnected: false): {server.IsSelectable(RedisCommand.PING, false)}"); - Writer.WriteLine($" UnselectableFlags: {server.GetUnselectableFlags()}"); - var bridge = server.GetBridge(RedisCommand.PING, create: false); - Writer.WriteLine($" GetBridge: {bridge}"); - Writer.WriteLine($" IsConnected: {bridge.IsConnected}"); - Writer.WriteLine($" ConnectionState: {bridge.ConnectionState}"); - } - } - - try - { - // Ensuring the FailFast policy errors immediate with no connection available exceptions - var options = new ConfigurationOptions() - { - BacklogPolicy = BacklogPolicy.FailFast, - AbortOnConnectFail = false, - ConnectTimeout = 1000, - ConnectRetry = 2, - SyncTimeout = 10000, - KeepAlive = 10000, - AsyncTimeout = 5000, - AllowAdmin = true, - }; - options.EndPoints.Add(TestConfig.Current.MasterServerAndPort); - - using var muxer = await ConnectionMultiplexer.ConnectAsync(options, Writer); - - var db = muxer.GetDatabase(); - Writer.WriteLine("Test: Initial (connected) ping"); - await db.PingAsync(); - - var server = muxer.GetServerSnapshot()[0]; - var stats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.Equal(0, stats.BacklogMessagesPending); // Everything's normal - - // Fail the connection - Writer.WriteLine("Test: Simulating failure"); - muxer.AllowConnect = false; - server.SimulateConnectionFailure(SimulatedFailureType.All); - Assert.False(muxer.IsConnected); - - // Queue up some commands - Writer.WriteLine("Test: Disconnected pings"); - await Assert.ThrowsAsync(() => db.PingAsync()); - - var disconnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.False(muxer.IsConnected); - Assert.Equal(0, disconnectedStats.BacklogMessagesPending); - - Writer.WriteLine("Test: Allowing reconnect"); - muxer.AllowConnect = true; - Writer.WriteLine("Test: Awaiting reconnect"); - await UntilCondition(TimeSpan.FromSeconds(3), () => muxer.IsConnected).ForAwait(); - - Writer.WriteLine("Test: Reconnecting"); - Assert.True(muxer.IsConnected); - Assert.True(server.IsConnected); - var reconnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.Equal(0, reconnectedStats.BacklogMessagesPending); - - _ = db.PingAsync(); - _ = db.PingAsync(); - var lastPing = db.PingAsync(); - - // For debug, print out the snapshot and server states - PrintSnapshot(muxer); - - Assert.NotNull(muxer.SelectServer(Message.Create(-1, CommandFlags.None, RedisCommand.PING))); - - // We should see none queued - Assert.Equal(0, stats.BacklogMessagesPending); - await lastPing; - } - finally - { - ClearAmbientFailures(); - } - } - - [Fact] - public async Task QueuesAndFlushesAfterReconnectingAsync() - { - try - { - var options = new ConfigurationOptions() - { - BacklogPolicy = BacklogPolicy.Default, - AbortOnConnectFail = false, - ConnectTimeout = 1000, - ConnectRetry = 2, - SyncTimeout = 10000, - KeepAlive = 10000, - AsyncTimeout = 5000, - AllowAdmin = true, - SocketManager = SocketManager.ThreadPool, - }; - options.EndPoints.Add(TestConfig.Current.MasterServerAndPort); - - using var muxer = await ConnectionMultiplexer.ConnectAsync(options, Writer); - muxer.ErrorMessage += (s, e) => Log($"Error Message {e.EndPoint}: {e.Message}"); - muxer.InternalError += (s, e) => Log($"Internal Error {e.EndPoint}: {e.Exception.Message}"); - muxer.ConnectionFailed += (s, a) => Log("Disconnected: " + EndPointCollection.ToString(a.EndPoint)); - muxer.ConnectionRestored += (s, a) => Log("Reconnected: " + EndPointCollection.ToString(a.EndPoint)); - - var db = muxer.GetDatabase(); - Writer.WriteLine("Test: Initial (connected) ping"); - await db.PingAsync(); - - var server = muxer.GetServerSnapshot()[0]; - var stats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.Equal(0, stats.BacklogMessagesPending); // Everything's normal - - // Fail the connection - Writer.WriteLine("Test: Simulating failure"); - muxer.AllowConnect = false; - server.SimulateConnectionFailure(SimulatedFailureType.All); - Assert.False(muxer.IsConnected); - - // Queue up some commands - Writer.WriteLine("Test: Disconnected pings"); - var ignoredA = db.PingAsync(); - var ignoredB = db.PingAsync(); - var lastPing = db.PingAsync(); - - // TODO: Add specific server call - - var disconnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.False(muxer.IsConnected); - Assert.True(disconnectedStats.BacklogMessagesPending >= 3, $"Expected {nameof(disconnectedStats.BacklogMessagesPending)} > 3, got {disconnectedStats.BacklogMessagesPending}"); - - Writer.WriteLine("Test: Allowing reconnect"); - muxer.AllowConnect = true; - Writer.WriteLine("Test: Awaiting reconnect"); - await UntilCondition(TimeSpan.FromSeconds(3), () => muxer.IsConnected).ForAwait(); - - Writer.WriteLine("Test: Checking reconnected 1"); - Assert.True(muxer.IsConnected); - - Writer.WriteLine("Test: ignoredA Status: " + ignoredA.Status); - Writer.WriteLine("Test: ignoredB Status: " + ignoredB.Status); - Writer.WriteLine("Test: lastPing Status: " + lastPing.Status); - var afterConnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Writer.WriteLine($"Test: BacklogStatus: {afterConnectedStats.BacklogStatus}, BacklogMessagesPending: {afterConnectedStats.BacklogMessagesPending}, IsWriterActive: {afterConnectedStats.IsWriterActive}, MessagesSinceLastHeartbeat: {afterConnectedStats.MessagesSinceLastHeartbeat}, TotalBacklogMessagesQueued: {afterConnectedStats.TotalBacklogMessagesQueued}"); - - Writer.WriteLine("Test: Awaiting lastPing 1"); - await lastPing; - - Writer.WriteLine("Test: Checking reconnected 2"); - Assert.True(muxer.IsConnected); - var reconnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.Equal(0, reconnectedStats.BacklogMessagesPending); - - Writer.WriteLine("Test: Pinging again..."); - _ = db.PingAsync(); - _ = db.PingAsync(); - Writer.WriteLine("Test: Last Ping issued"); - lastPing = db.PingAsync(); - - // We should see none queued - Writer.WriteLine("Test: BacklogMessagesPending check"); - Assert.Equal(0, stats.BacklogMessagesPending); - Writer.WriteLine("Test: Awaiting lastPing 2"); - await lastPing; - Writer.WriteLine("Test: Done"); - } - finally - { - ClearAmbientFailures(); - } - } - - [Fact] - public async Task QueuesAndFlushesAfterReconnecting() - { - try - { - var options = new ConfigurationOptions() - { - BacklogPolicy = BacklogPolicy.Default, - AbortOnConnectFail = false, - ConnectTimeout = 1000, - ConnectRetry = 2, - SyncTimeout = 10000, - KeepAlive = 10000, - AsyncTimeout = 5000, - AllowAdmin = true, - SocketManager = SocketManager.ThreadPool, - }; - options.EndPoints.Add(TestConfig.Current.MasterServerAndPort); - - using var muxer = await ConnectionMultiplexer.ConnectAsync(options, Writer); - muxer.ErrorMessage += (s, e) => Log($"Error Message {e.EndPoint}: {e.Message}"); - muxer.InternalError += (s, e) => Log($"Internal Error {e.EndPoint}: {e.Exception.Message}"); - muxer.ConnectionFailed += (s, a) => Log("Disconnected: " + EndPointCollection.ToString(a.EndPoint)); - muxer.ConnectionRestored += (s, a) => Log("Reconnected: " + EndPointCollection.ToString(a.EndPoint)); - - var db = muxer.GetDatabase(); - Writer.WriteLine("Test: Initial (connected) ping"); - await db.PingAsync(); - - var server = muxer.GetServerSnapshot()[0]; - var stats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.Equal(0, stats.BacklogMessagesPending); // Everything's normal - - // Fail the connection - Writer.WriteLine("Test: Simulating failure"); - muxer.AllowConnect = false; - server.SimulateConnectionFailure(SimulatedFailureType.All); - Assert.False(muxer.IsConnected); - - // Queue up some commands - Writer.WriteLine("Test: Disconnected pings"); - - Task[] pings = new Task[3]; - pings[0] = RunBlockingSynchronousWithExtraThreadAsync(() => disconnectedPings(1)); - pings[1] = RunBlockingSynchronousWithExtraThreadAsync(() => disconnectedPings(2)); - pings[2] = RunBlockingSynchronousWithExtraThreadAsync(() => disconnectedPings(3)); - void disconnectedPings(int id) - { - // No need to delay, we're going to try a disconnected connection immediately so it'll fail... - Log($"Pinging (disconnected - {id})"); - var result = db.Ping(); - Log($"Pinging (disconnected - {id}) - result: " + result); - } - Writer.WriteLine("Test: Disconnected pings issued"); - - Assert.False(muxer.IsConnected); - // Give the tasks time to queue - await UntilCondition(TimeSpan.FromSeconds(5), () => server.GetBridgeStatus(ConnectionType.Interactive).BacklogMessagesPending >= 3); - - var disconnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Log($"Test Stats: (BacklogMessagesPending: {disconnectedStats.BacklogMessagesPending}, TotalBacklogMessagesQueued: {disconnectedStats.TotalBacklogMessagesQueued})"); - Assert.True(disconnectedStats.BacklogMessagesPending >= 3, $"Expected {nameof(disconnectedStats.BacklogMessagesPending)} > 3, got {disconnectedStats.BacklogMessagesPending}"); - - Writer.WriteLine("Test: Allowing reconnect"); - muxer.AllowConnect = true; - Writer.WriteLine("Test: Awaiting reconnect"); - await UntilCondition(TimeSpan.FromSeconds(3), () => muxer.IsConnected).ForAwait(); - - Writer.WriteLine("Test: Checking reconnected 1"); - Assert.True(muxer.IsConnected); - - var afterConnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Writer.WriteLine($"Test: BacklogStatus: {afterConnectedStats.BacklogStatus}, BacklogMessagesPending: {afterConnectedStats.BacklogMessagesPending}, IsWriterActive: {afterConnectedStats.IsWriterActive}, MessagesSinceLastHeartbeat: {afterConnectedStats.MessagesSinceLastHeartbeat}, TotalBacklogMessagesQueued: {afterConnectedStats.TotalBacklogMessagesQueued}"); - - Writer.WriteLine("Test: Awaiting 3 pings"); - await Task.WhenAll(pings); - - Writer.WriteLine("Test: Checking reconnected 2"); - Assert.True(muxer.IsConnected); - var reconnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.Equal(0, reconnectedStats.BacklogMessagesPending); - - Writer.WriteLine("Test: Pinging again..."); - pings[0] = RunBlockingSynchronousWithExtraThreadAsync(() => disconnectedPings(4)); - pings[1] = RunBlockingSynchronousWithExtraThreadAsync(() => disconnectedPings(5)); - pings[2] = RunBlockingSynchronousWithExtraThreadAsync(() => disconnectedPings(6)); - Writer.WriteLine("Test: Last Ping queued"); - - // We should see none queued - Writer.WriteLine("Test: BacklogMessagesPending check"); - Assert.Equal(0, stats.BacklogMessagesPending); - Writer.WriteLine("Test: Awaiting 3 more pings"); - await Task.WhenAll(pings); - Writer.WriteLine("Test: Done"); - } - finally - { - ClearAmbientFailures(); - } - } - - [Fact] - public async Task QueuesAndFlushesAfterReconnectingClusterAsync() - { - try - { - var options = ConfigurationOptions.Parse(TestConfig.Current.ClusterServersAndPorts); - options.BacklogPolicy = BacklogPolicy.Default; - options.AbortOnConnectFail = false; - options.ConnectTimeout = 1000; - options.ConnectRetry = 2; - options.SyncTimeout = 10000; - options.KeepAlive = 10000; - options.AsyncTimeout = 5000; - options.AllowAdmin = true; - options.SocketManager = SocketManager.ThreadPool; - - using var muxer = await ConnectionMultiplexer.ConnectAsync(options, Writer); - muxer.ErrorMessage += (s, e) => Log($"Error Message {e.EndPoint}: {e.Message}"); - muxer.InternalError += (s, e) => Log($"Internal Error {e.EndPoint}: {e.Exception.Message}"); - muxer.ConnectionFailed += (s, a) => Log("Disconnected: " + EndPointCollection.ToString(a.EndPoint)); - muxer.ConnectionRestored += (s, a) => Log("Reconnected: " + EndPointCollection.ToString(a.EndPoint)); - - var db = muxer.GetDatabase(); - Writer.WriteLine("Test: Initial (connected) ping"); - await db.PingAsync(); - - RedisKey meKey = Me(); - var getMsg = Message.Create(0, CommandFlags.None, RedisCommand.GET, meKey); - - var server = muxer.SelectServer(getMsg); // Get the server specifically for this message's hash slot - var stats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.Equal(0, stats.BacklogMessagesPending); // Everything's normal - - static Task PingAsync(ServerEndPoint server, CommandFlags flags = CommandFlags.None) - { - var message = ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.PING); - - server.Multiplexer.CheckMessage(message); - return server.Multiplexer.ExecuteAsyncImpl(message, ResultProcessor.ResponseTimer, null, server); - } - - // Fail the connection - Writer.WriteLine("Test: Simulating failure"); - muxer.AllowConnect = false; - server.SimulateConnectionFailure(SimulatedFailureType.All); - Assert.False(server.IsConnected); // Server isn't connected - Assert.True(muxer.IsConnected); // ...but the multiplexer is - - // Queue up some commands - Writer.WriteLine("Test: Disconnected pings"); - var ignoredA = PingAsync(server); - var ignoredB = PingAsync(server); - var lastPing = PingAsync(server); - - var disconnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.False(server.IsConnected); - Assert.True(muxer.IsConnected); - Assert.True(disconnectedStats.BacklogMessagesPending >= 3, $"Expected {nameof(disconnectedStats.BacklogMessagesPending)} > 3, got {disconnectedStats.BacklogMessagesPending}"); - - Writer.WriteLine("Test: Allowing reconnect"); - muxer.AllowConnect = true; - Writer.WriteLine("Test: Awaiting reconnect"); - await UntilCondition(TimeSpan.FromSeconds(3), () => server.IsConnected).ForAwait(); - - Writer.WriteLine("Test: Checking reconnected 1"); - Assert.True(server.IsConnected); - Assert.True(muxer.IsConnected); - - Writer.WriteLine("Test: ignoredA Status: " + ignoredA.Status); - Writer.WriteLine("Test: ignoredB Status: " + ignoredB.Status); - Writer.WriteLine("Test: lastPing Status: " + lastPing.Status); - var afterConnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Writer.WriteLine($"Test: BacklogStatus: {afterConnectedStats.BacklogStatus}, BacklogMessagesPending: {afterConnectedStats.BacklogMessagesPending}, IsWriterActive: {afterConnectedStats.IsWriterActive}, MessagesSinceLastHeartbeat: {afterConnectedStats.MessagesSinceLastHeartbeat}, TotalBacklogMessagesQueued: {afterConnectedStats.TotalBacklogMessagesQueued}"); - - Writer.WriteLine("Test: Awaiting lastPing 1"); - await lastPing; - - Writer.WriteLine("Test: Checking reconnected 2"); - Assert.True(server.IsConnected); - Assert.True(muxer.IsConnected); - var reconnectedStats = server.GetBridgeStatus(ConnectionType.Interactive); - Assert.Equal(0, reconnectedStats.BacklogMessagesPending); - - Writer.WriteLine("Test: Pinging again..."); - _ = PingAsync(server); - _ = PingAsync(server); - Writer.WriteLine("Test: Last Ping issued"); - lastPing = PingAsync(server); ; - - // We should see none queued - Writer.WriteLine("Test: BacklogMessagesPending check"); - Assert.Equal(0, stats.BacklogMessagesPending); - Writer.WriteLine("Test: Awaiting lastPing 2"); - await lastPing; - Writer.WriteLine("Test: Done"); - } - finally - { - ClearAmbientFailures(); - } - } - } -} diff --git a/tests/StackExchange.Redis.Tests/ConnectFailTimeout.cs b/tests/StackExchange.Redis.Tests/ConnectFailTimeout.cs index 6a9d2a399..73af84fa4 100644 --- a/tests/StackExchange.Redis.Tests/ConnectFailTimeout.cs +++ b/tests/StackExchange.Redis.Tests/ConnectFailTimeout.cs @@ -13,7 +13,7 @@ public ConnectFailTimeout(ITestOutputHelper output) : base (output) { } public async Task NoticesConnectFail() { SetExpectedAmbientFailureCount(-1); - using (var conn = Create(allowAdmin: true, shared: false, backlogPolicy: BacklogPolicy.FailFast)) + using (var conn = Create(allowAdmin: true, shared: false)) { var server = conn.GetServer(conn.GetEndPoints()[0]); diff --git a/tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs b/tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs index 606745a08..fb0b84d21 100644 --- a/tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs +++ b/tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs @@ -97,12 +97,11 @@ public async Task Issue922_ReconnectRaised() { var config = ConfigurationOptions.Parse(TestConfig.Current.MasterServerAndPort); config.AbortOnConnectFail = true; - config.KeepAlive = 1; + config.KeepAlive = 10; config.SyncTimeout = 1000; config.AsyncTimeout = 1000; config.ReconnectRetryPolicy = new ExponentialRetry(5000); config.AllowAdmin = true; - config.BacklogPolicy = BacklogPolicy.FailFast; int failCount = 0, restoreCount = 0; diff --git a/tests/StackExchange.Redis.Tests/ConnectionFailedErrors.cs b/tests/StackExchange.Redis.Tests/ConnectionFailedErrors.cs index 94623e4fd..cd3521788 100644 --- a/tests/StackExchange.Redis.Tests/ConnectionFailedErrors.cs +++ b/tests/StackExchange.Redis.Tests/ConnectionFailedErrors.cs @@ -105,7 +105,6 @@ void innerScenario() options.Password = ""; options.AbortOnConnectFail = false; options.ConnectTimeout = 1000; - options.BacklogPolicy = BacklogPolicy.FailFast; var outer = Assert.Throws(() => { using (var muxer = ConnectionMultiplexer.Connect(options)) diff --git a/tests/StackExchange.Redis.Tests/ExceptionFactoryTests.cs b/tests/StackExchange.Redis.Tests/ExceptionFactoryTests.cs index 31585159f..437dd44e0 100644 --- a/tests/StackExchange.Redis.Tests/ExceptionFactoryTests.cs +++ b/tests/StackExchange.Redis.Tests/ExceptionFactoryTests.cs @@ -63,7 +63,7 @@ public void ServerTakesPrecendenceOverSnapshot() { try { - using (var muxer = Create(keepAlive: 1, connectTimeout: 10000, allowAdmin: true, shared: false, backlogPolicy: BacklogPolicy.FailFast)) + using (var muxer = Create(keepAlive: 1, connectTimeout: 10000, allowAdmin: true, shared: false)) { muxer.GetDatabase(); muxer.AllowConnect = false; @@ -156,7 +156,6 @@ public void NoConnectionException(bool abortOnConnect, int connCount, int comple var options = new ConfigurationOptions() { AbortOnConnectFail = abortOnConnect, - BacklogPolicy = BacklogPolicy.FailFast, ConnectTimeout = 1000, SyncTimeout = 500, KeepAlive = 5000 diff --git a/tests/StackExchange.Redis.Tests/PubSub.cs b/tests/StackExchange.Redis.Tests/PubSub.cs index 2dce49c5b..2c4d6ef80 100644 --- a/tests/StackExchange.Redis.Tests/PubSub.cs +++ b/tests/StackExchange.Redis.Tests/PubSub.cs @@ -539,9 +539,6 @@ public async Task PubSubGetAllCorrectOrder_OnMessage_Async() }); await sub.PingAsync().ForAwait(); - // Give a delay between subscriptions and when we try to publish to be safe - await Task.Delay(1000).ForAwait(); - lock (syncLock) { for (int i = 0; i < count; i++) @@ -804,8 +801,8 @@ await sub.SubscribeAsync(channel, delegate Log("Failing connection"); // Fail all connections server.SimulateConnectionFailure(SimulatedFailureType.All); - // Trigger failure (RedisTimeoutException because of backlog behavior) - Assert.Throws(() => sub.Ping()); + // Trigger failure + Assert.Throws(() => sub.Ping()); Assert.False(sub.IsConnected(channel)); // Now reconnect... diff --git a/tests/StackExchange.Redis.Tests/PubSubMultiserver.cs b/tests/StackExchange.Redis.Tests/PubSubMultiserver.cs index e3f9590b3..1a667970e 100644 --- a/tests/StackExchange.Redis.Tests/PubSubMultiserver.cs +++ b/tests/StackExchange.Redis.Tests/PubSubMultiserver.cs @@ -103,7 +103,7 @@ public async Task PrimaryReplicaSubscriptionFailover(CommandFlags flags, bool ex Log("Connecting..."); using var muxer = Create(configuration: config, shared: false, allowAdmin: true) as ConnectionMultiplexer; var sub = muxer.GetSubscriber(); - var channel = (RedisChannel)(Me() + flags.ToString()); // Individual channel per case to not overlap publishers + var channel = (RedisChannel)Me(); var count = 0; Log("Subscribing..."); diff --git a/tests/StackExchange.Redis.Tests/Secure.cs b/tests/StackExchange.Redis.Tests/Secure.cs index 79cba81b8..2e7d70929 100644 --- a/tests/StackExchange.Redis.Tests/Secure.cs +++ b/tests/StackExchange.Redis.Tests/Secure.cs @@ -65,7 +65,6 @@ public async Task ConnectWithWrongPassword(string password) var config = ConfigurationOptions.Parse(GetConfiguration()); config.Password = password; config.ConnectRetry = 0; // we don't want to retry on closed sockets in this case. - config.BacklogPolicy = BacklogPolicy.FailFast; var ex = await Assert.ThrowsAsync(async () => { diff --git a/tests/StackExchange.Redis.Tests/SharedConnectionFixture.cs b/tests/StackExchange.Redis.Tests/SharedConnectionFixture.cs index a91047be1..bf22489dd 100644 --- a/tests/StackExchange.Redis.Tests/SharedConnectionFixture.cs +++ b/tests/StackExchange.Redis.Tests/SharedConnectionFixture.cs @@ -326,20 +326,8 @@ public void Teardown(TextWriter output) } //Assert.True(false, $"There were {privateFailCount} private ambient exceptions."); } - - if (_actualConnection != null) - { - TestBase.Log(output, "Connection Counts: " + _actualConnection.GetCounters().ToString()); - foreach (var ep in _actualConnection.GetServerSnapshot()) - { - var interactive = ep.GetBridge(ConnectionType.Interactive); - TestBase.Log(output, $" {Format.ToString(interactive)}: " + interactive.GetStatus()); - - var subscription = ep.GetBridge(ConnectionType.Subscription); - TestBase.Log(output, $" {Format.ToString(subscription)}: " + subscription.GetStatus()); - } - - } + var pool = SocketManager.Shared?.SchedulerPool; + TestBase.Log(output, $"Service Counts: (Scheduler) By Queue: {pool?.TotalServicedByQueue.ToString()}, By Pool: {pool?.TotalServicedByPool.ToString()}, Workers: {pool?.WorkerCount.ToString()}, Available: {pool?.AvailableCount.ToString()}"); } } diff --git a/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj b/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj index 454e97982..5cf895e50 100644 --- a/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj +++ b/tests/StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj @@ -1,6 +1,6 @@  - net472;net6.0 + net472;netcoreapp3.1;net6.0 StackExchange.Redis.Tests true true diff --git a/tests/StackExchange.Redis.Tests/TestBase.cs b/tests/StackExchange.Redis.Tests/TestBase.cs index 60636db54..dd04fa9da 100644 --- a/tests/StackExchange.Redis.Tests/TestBase.cs +++ b/tests/StackExchange.Redis.Tests/TestBase.cs @@ -256,7 +256,6 @@ internal virtual IInternalConnectionMultiplexer Create( bool logTransactionData = true, bool shared = true, int? defaultDatabase = null, - BacklogPolicy backlogPolicy = null, [CallerMemberName] string caller = null) { if (Output == null) @@ -277,8 +276,7 @@ internal virtual IInternalConnectionMultiplexer Create( && tieBreaker == null && defaultDatabase == null && (allowAdmin == null || allowAdmin == true) - && expectedFailCount == 0 - && backlogPolicy == null) + && expectedFailCount == 0) { configuration = GetConfiguration(); if (configuration == _fixture.Configuration) @@ -296,7 +294,6 @@ internal virtual IInternalConnectionMultiplexer Create( channelPrefix, proxy, configuration ?? GetConfiguration(), logTransactionData, defaultDatabase, - backlogPolicy, caller); muxer.InternalError += OnInternalError; muxer.ConnectionFailed += OnConnectionFailed; @@ -327,7 +324,6 @@ public static ConnectionMultiplexer CreateDefault( string configuration = null, bool logTransactionData = true, int? defaultDatabase = null, - BacklogPolicy backlogPolicy = null, [CallerMemberName] string caller = null) { StringWriter localLog = null; @@ -363,7 +359,6 @@ public static ConnectionMultiplexer CreateDefault( if (connectTimeout != null) config.ConnectTimeout = connectTimeout.Value; if (proxy != null) config.Proxy = proxy.Value; if (defaultDatabase != null) config.DefaultDatabase = defaultDatabase.Value; - if (backlogPolicy != null) config.BacklogPolicy = backlogPolicy; var watch = Stopwatch.StartNew(); var task = ConnectionMultiplexer.ConnectAsync(config, log); if (!task.Wait(config.ConnectTimeout >= (int.MaxValue / 2) ? int.MaxValue : config.ConnectTimeout * 2))