diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index 12959c51e..1616a5d14 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -384,7 +384,7 @@ public void ExportConfiguration(Stream destination, ExportOptions options = Expo } } - internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options, LogProxy log) + internal async Task MakePrimaryAsync(ServerEndPoint server, ReplicationChangeOptions options, LogProxy log) { var cmd = server.GetFeatures().ReplicaCommands ? RedisCommand.REPLICAOF : RedisCommand.SLAVEOF; CommandMap.AssertAvailable(cmd); @@ -395,15 +395,13 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options var srv = new RedisServer(this, server, null); if (!srv.IsConnected) throw ExceptionFactory.NoConnectionAvailable(this, null, server, GetServerSnapshot(), command: cmd); -#pragma warning disable CS0618 - const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority; -#pragma warning restore CS0618 + const CommandFlags flags = CommandFlags.NoRedirect; Message msg; log?.WriteLine($"Checking {Format.ToString(srv.EndPoint)} is available..."); try { - srv.Ping(flags); // if it isn't happy, we're not happy + await srv.PingAsync(flags); // if it isn't happy, we're not happy } catch (Exception ex) { @@ -411,7 +409,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options throw; } - var nodes = GetServerSnapshot(); + var nodes = GetServerSnapshot().ToArray(); // Have to array because async/await RedisValue newMaster = Format.ToString(server.EndPoint); RedisKey tieBreakerKey = default(RedisKey); @@ -423,12 +421,14 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options foreach (var node in nodes) { - if (!node.IsConnected) continue; + if (!node.IsConnected || node.IsReplica) continue; log?.WriteLine($"Attempting to set tie-breaker on {Format.ToString(node.EndPoint)}..."); - msg = Message.Create(0, flags, RedisCommand.SET, tieBreakerKey, newMaster); -#pragma warning disable CS0618 - node.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK); -#pragma warning restore CS0618 + msg = Message.Create(0, flags | CommandFlags.FireAndForget, RedisCommand.SET, tieBreakerKey, newMaster); + try + { + await node.WriteDirectAsync(msg, ResultProcessor.DemandOK); + } + catch { } } } @@ -436,7 +436,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options log?.WriteLine($"Making {Format.ToString(srv.EndPoint)} a master..."); try { - srv.ReplicaOf(null, flags); + await srv.ReplicaOfAsync(null, flags); } catch (Exception ex) { @@ -445,13 +445,15 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options } // also, in case it was a replica a moment ago, and hasn't got the tie-breaker yet, we re-send the tie-breaker to this one - if (!tieBreakerKey.IsNull) + if (!tieBreakerKey.IsNull && !server.IsReplica) { log?.WriteLine($"Resending tie-breaker to {Format.ToString(server.EndPoint)}..."); - msg = Message.Create(0, flags, RedisCommand.SET, tieBreakerKey, newMaster); -#pragma warning disable CS0618 - server.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK); -#pragma warning restore CS0618 + msg = Message.Create(0, flags | CommandFlags.FireAndForget, RedisCommand.SET, tieBreakerKey, newMaster); + try + { + await server.WriteDirectAsync(msg, ResultProcessor.DemandOK); + } + catch { } } // There's an inherent race here in zero-latency environments (e.g. when Redis is on localhost) when a broadcast is specified @@ -465,7 +467,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options // We want everyone possible to pick it up. // We broadcast before *and after* the change to remote members, so that they don't go without detecting a change happened. // This eliminates the race of pub/sub *then* re-slaving happening, since a method both precedes and follows. - void Broadcast(ReadOnlySpan serverNodes) + async Task BroadcastAsync(ServerEndPoint[] serverNodes) { if ((options & ReplicationChangeOptions.Broadcast) != 0 && ConfigurationChangedChannel != null && CommandMap.IsAvailable(RedisCommand.PUBLISH)) @@ -475,16 +477,14 @@ void Broadcast(ReadOnlySpan serverNodes) { if (!node.IsConnected) continue; log?.WriteLine($"Broadcasting via {Format.ToString(node.EndPoint)}..."); - msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, newMaster); -#pragma warning disable CS0618 - node.WriteDirectFireAndForgetSync(msg, ResultProcessor.Int64); -#pragma warning restore CS0618 + msg = Message.Create(-1, flags | CommandFlags.FireAndForget, RedisCommand.PUBLISH, channel, newMaster); + await node.WriteDirectAsync(msg, ResultProcessor.Int64); } } } // Send a message before it happens - because afterwards a new replica may be unresponsive - Broadcast(nodes); + await BroadcastAsync(nodes); if ((options & ReplicationChangeOptions.ReplicateToOtherEndpoints) != 0) { @@ -494,16 +494,14 @@ void Broadcast(ReadOnlySpan serverNodes) log?.WriteLine($"Replicating to {Format.ToString(node.EndPoint)}..."); msg = RedisServer.CreateReplicaOfMessage(node, server.EndPoint, flags); -#pragma warning disable CS0618 - node.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK); -#pragma warning restore CS0618 + await node.WriteDirectAsync(msg, ResultProcessor.DemandOK); } } // ...and send one after it happens - because the first broadcast may have landed on a secondary client // and it can reconfigure before any topology change actually happened. This is most likely to happen // in low-latency environments. - Broadcast(nodes); + await BroadcastAsync(nodes); // and reconfigure the muxer log?.WriteLine("Reconfiguring all endpoints..."); @@ -513,7 +511,7 @@ void Broadcast(ReadOnlySpan serverNodes) { Interlocked.Exchange(ref activeConfigCause, null); } - if (!ReconfigureAsync(first: false, reconfigureAll: true, log, srv.EndPoint, "make master").ObserveErrors().Wait(5000)) + if (!await ReconfigureAsync(first: false, reconfigureAll: true, log, srv.EndPoint, "make master")) { log?.WriteLine("Verifying the configuration was incomplete; please verify"); } diff --git a/src/StackExchange.Redis/Enums/CommandFlags.cs b/src/StackExchange.Redis/Enums/CommandFlags.cs index 119fe22bb..074779fb4 100644 --- a/src/StackExchange.Redis/Enums/CommandFlags.cs +++ b/src/StackExchange.Redis/Enums/CommandFlags.cs @@ -18,7 +18,7 @@ public enum CommandFlags /// /// From 2.0, this flag is not used /// - [Obsolete("From 2.0, this flag is not used", false)] + [Obsolete("From 2.0, this flag is not used, this will be removed in 3.0.", false)] HighPriority = 1, /// /// The caller is not interested in the result; the caller will immediately receive a default-value diff --git a/src/StackExchange.Redis/Interfaces/IServer.cs b/src/StackExchange.Redis/Interfaces/IServer.cs index 792807268..724a13523 100644 --- a/src/StackExchange.Redis/Interfaces/IServer.cs +++ b/src/StackExchange.Redis/Interfaces/IServer.cs @@ -418,12 +418,20 @@ public partial interface IServer : IRedis Task LastSaveAsync(CommandFlags flags = CommandFlags.None); /// - /// Promote the selected node to be master + /// Promote the selected node to be primary. /// /// The options to use for this topology change. /// The log to write output to. + [Obsolete("Please use " + nameof(MakePrimaryAsync) + ", this will be removed in 3.0.")] void MakeMaster(ReplicationChangeOptions options, TextWriter log = null); + /// + /// Promote the selected node to be primary. + /// + /// The options to use for this topology change. + /// The log to write output to. + Task MakePrimaryAsync(ReplicationChangeOptions options, TextWriter log = null); + /// /// Returns the role info for the current server. /// @@ -540,7 +548,7 @@ public partial interface IServer : IRedis /// Endpoint of the new master to replicate from. /// The command flags to use. /// https://redis.io/commands/replicaof - [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(ReplicaOf) + " instead.")] + [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(ReplicaOfAsync) + " instead, this will be removed in 3.0.")] [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] void SlaveOf(EndPoint master, CommandFlags flags = CommandFlags.None); @@ -550,6 +558,7 @@ public partial interface IServer : IRedis /// Endpoint of the new master to replicate from. /// The command flags to use. /// https://redis.io/commands/replicaof + [Obsolete("Please use " + nameof(ReplicaOfAsync) + ", this will be removed in 3.0.")] void ReplicaOf(EndPoint master, CommandFlags flags = CommandFlags.None); /// @@ -558,7 +567,7 @@ public partial interface IServer : IRedis /// Endpoint of the new master to replicate from. /// The command flags to use. /// https://redis.io/commands/replicaof - [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(ReplicaOfAsync) + " instead.")] + [Obsolete("Starting with Redis version 5, Redis has moved to 'replica' terminology. Please use " + nameof(ReplicaOfAsync) + " instead, this will be removed in 3.0.")] [Browsable(false), EditorBrowsable(EditorBrowsableState.Never)] Task SlaveOfAsync(EndPoint master, CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/RedisServer.cs b/src/StackExchange.Redis/RedisServer.cs index fc60c48f6..27a7a62c5 100644 --- a/src/StackExchange.Redis/RedisServer.cs +++ b/src/StackExchange.Redis/RedisServer.cs @@ -336,7 +336,16 @@ public void MakeMaster(ReplicationChangeOptions options, TextWriter log = null) { using (var proxy = LogProxy.TryCreate(log)) { - multiplexer.MakeMaster(server, options, proxy); + // Do you believe in magic? + multiplexer.MakePrimaryAsync(server, options, proxy).Wait(60000); + } + } + + public async Task MakePrimaryAsync(ReplicationChangeOptions options, TextWriter log = null) + { + using (var proxy = LogProxy.TryCreate(log)) + { + await multiplexer.MakePrimaryAsync(server, options, proxy); } } @@ -571,6 +580,32 @@ internal static Message CreateReplicaOfMessage(ServerEndPoint sendMessageTo, End return Message.Create(-1, flags, sendMessageTo.GetFeatures().ReplicaCommands ? RedisCommand.REPLICAOF : RedisCommand.SLAVEOF, host, port); } + private Message GetTiebreakerRemovalMessage() + { + var configuration = multiplexer.RawConfig; + + if (!string.IsNullOrWhiteSpace(configuration.TieBreaker) && multiplexer.CommandMap.IsAvailable(RedisCommand.DEL)) + { + var msg = Message.Create(0, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.DEL, (RedisKey)configuration.TieBreaker); + msg.SetInternalCall(); + return msg; + } + return null; + } + + private Message GetConfigChangeMessage() + { + // attempt to broadcast a reconfigure message to anybody listening to this server + var channel = multiplexer.ConfigurationChangedChannel; + if (channel != null && multiplexer.CommandMap.IsAvailable(RedisCommand.PUBLISH)) + { + var msg = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.PUBLISH, (RedisValue)channel, RedisLiterals.Wildcard); + msg.SetInternalCall(); + return msg; + } + return null; + } + internal override Task ExecuteAsync(Message message, ResultProcessor processor, ServerEndPoint server = null) { // inject our expected server automatically @@ -625,46 +660,56 @@ public void ReplicaOf(EndPoint master, CommandFlags flags = CommandFlags.None) { throw new ArgumentException("Cannot replicate to self"); } - // prepare the actual replicaof message (not sent yet) - var replicaOfMsg = CreateReplicaOfMessage(server, master, flags); - - var configuration = multiplexer.RawConfig; +#pragma warning disable CS0618 // attempt to cease having an opinion on the master; will resume that when replication completes // (note that this may fail; we aren't depending on it) - if (!string.IsNullOrWhiteSpace(configuration.TieBreaker) - && multiplexer.CommandMap.IsAvailable(RedisCommand.DEL)) + if (GetTiebreakerRemovalMessage() is Message tieBreakerRemoval) { - var del = Message.Create(0, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.DEL, (RedisKey)configuration.TieBreaker); - del.SetInternalCall(); -#pragma warning disable CS0618 - server.WriteDirectFireAndForgetSync(del, ResultProcessor.Boolean); -#pragma warning restore CS0618 + tieBreakerRemoval.SetSource(ResultProcessor.Boolean, null); + server.GetBridge(tieBreakerRemoval).TryWriteSync(tieBreakerRemoval, server.IsReplica); } + + var replicaOfMsg = CreateReplicaOfMessage(server, master, flags); ExecuteSync(replicaOfMsg, ResultProcessor.DemandOK); // attempt to broadcast a reconfigure message to anybody listening to this server - var channel = multiplexer.ConfigurationChangedChannel; - if (channel != null && multiplexer.CommandMap.IsAvailable(RedisCommand.PUBLISH)) + if (GetConfigChangeMessage() is Message configChangeMessage) { - var pub = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.PUBLISH, (RedisValue)channel, RedisLiterals.Wildcard); - pub.SetInternalCall(); -#pragma warning disable CS0618 - server.WriteDirectFireAndForgetSync(pub, ResultProcessor.Int64); -#pragma warning restore CS0618 + configChangeMessage.SetSource(ResultProcessor.Int64, null); + server.GetBridge(configChangeMessage).TryWriteSync(configChangeMessage, server.IsReplica); } +#pragma warning restore CS0618 } Task IServer.SlaveOfAsync(EndPoint master, CommandFlags flags) => ReplicaOfAsync(master, flags); - public Task ReplicaOfAsync(EndPoint master, CommandFlags flags = CommandFlags.None) + public async Task ReplicaOfAsync(EndPoint master, CommandFlags flags = CommandFlags.None) { - var msg = CreateReplicaOfMessage(server, master, flags); if (master == server.EndPoint) { throw new ArgumentException("Cannot replicate to self"); } - return ExecuteAsync(msg, ResultProcessor.DemandOK); + + // attempt to cease having an opinion on the master; will resume that when replication completes + // (note that this may fail; we aren't depending on it) + if (GetTiebreakerRemovalMessage() is Message tieBreakerRemoval && !server.IsReplica) + { + try + { + await server.WriteDirectAsync(tieBreakerRemoval, ResultProcessor.Boolean); + } + catch { } + } + + var msg = CreateReplicaOfMessage(server, master, flags); + await ExecuteAsync(msg, ResultProcessor.DemandOK); + + // attempt to broadcast a reconfigure message to anybody listening to this server + if (GetConfigChangeMessage() is Message configChangeMessage) + { + await server.WriteDirectAsync(configChangeMessage, ResultProcessor.Int64); + } } private static void FixFlags(Message message, ServerEndPoint server) diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index a93fb438f..560bba8c6 100755 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -333,9 +333,7 @@ internal async Task AutoConfigureAsync(PhysicalConnection connection, LogProxy l log?.WriteLine($"{Format.ToString(this)}: Auto-configuring..."); var commandMap = Multiplexer.CommandMap; -#pragma warning disable CS0618 - const CommandFlags flags = CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect; -#pragma warning restore CS0618 + const CommandFlags flags = CommandFlags.FireAndForget | CommandFlags.NoRedirect; var features = GetFeatures(); Message msg; @@ -670,10 +668,11 @@ internal bool CheckInfoReplication() if (version >= RedisFeatures.v2_8_0 && Multiplexer.CommandMap.IsAvailable(RedisCommand.INFO) && (bridge = GetBridge(ConnectionType.Interactive, false)) != null) { -#pragma warning disable CS0618 - var msg = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect, RedisCommand.INFO, RedisLiterals.replication); + var msg = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.INFO, RedisLiterals.replication); msg.SetInternalCall(); - WriteDirectFireAndForgetSync(msg, ResultProcessor.AutoConfigure, bridge); + msg.SetSource(ResultProcessor.AutoConfigure, null); +#pragma warning disable CS0618 + bridge.TryWriteSync(msg, isReplica); #pragma warning restore CS0618 return true; } @@ -764,17 +763,6 @@ static async Task Awaited(ServerEndPoint @this, Message message, ValueTask(Message message, ResultProcessor processor, PhysicalBridge bridge = null) - { - if (message != null) - { - message.SetSource(processor, null); - Multiplexer.Trace("Enqueue: " + message); - (bridge ?? GetBridge(message)).TryWriteSync(message, isReplica); - } - } - internal void ReportNextFailure() { interactive?.ReportNextFailure(); diff --git a/tests/StackExchange.Redis.Tests/Failover.cs b/tests/StackExchange.Redis.Tests/Failover.cs index 1aa26e237..50565580f 100644 --- a/tests/StackExchange.Redis.Tests/Failover.cs +++ b/tests/StackExchange.Redis.Tests/Failover.cs @@ -9,7 +9,7 @@ namespace StackExchange.Redis.Tests { public class Failover : TestBase, IAsyncLifetime { - protected override string GetConfiguration() => GetMasterReplicaConfig().ToString(); + protected override string GetConfiguration() => GetPrimaryReplicaConfig().ToString(); public Failover(ITestOutputHelper output) : base(output) { @@ -21,24 +21,24 @@ public async Task InitializeAsync() { using (var mutex = Create()) { - var shouldBeMaster = mutex.GetServer(TestConfig.Current.FailoverMasterServerAndPort); - if (shouldBeMaster.IsReplica) + var shouldBePrimary = mutex.GetServer(TestConfig.Current.FailoverMasterServerAndPort); + if (shouldBePrimary.IsReplica) { - Log(shouldBeMaster.EndPoint + " should be master, fixing..."); - shouldBeMaster.MakeMaster(ReplicationChangeOptions.SetTiebreaker); + Log(shouldBePrimary.EndPoint + " should be primary, fixing..."); + await shouldBePrimary.MakePrimaryAsync(ReplicationChangeOptions.SetTiebreaker); } var shouldBeReplica = mutex.GetServer(TestConfig.Current.FailoverReplicaServerAndPort); if (!shouldBeReplica.IsReplica) { Log(shouldBeReplica.EndPoint + " should be a replica, fixing..."); - shouldBeReplica.ReplicaOf(shouldBeMaster.EndPoint); + await shouldBeReplica.ReplicaOfAsync(shouldBePrimary.EndPoint); await Task.Delay(2000).ForAwait(); } } } - private static ConfigurationOptions GetMasterReplicaConfig() + private static ConfigurationOptions GetPrimaryReplicaConfig() { return new ConfigurationOptions { @@ -99,10 +99,10 @@ public async Task ConfigVerifyReceiveConfigChangeBroadcast() Interlocked.Exchange(ref total, 0); - // and send a second time via a re-master operation + // and send a second time via a re-primary operation var server = GetServer(sender); if (server.IsReplica) Skip.Inconclusive("didn't expect a replica"); - server.MakeMaster(ReplicationChangeOptions.Broadcast); + await server.MakePrimaryAsync(ReplicationChangeOptions.Broadcast); await Task.Delay(1000).ConfigureAwait(false); GetServer(receiver).Ping(); GetServer(receiver).Ping(); @@ -113,7 +113,7 @@ public async Task ConfigVerifyReceiveConfigChangeBroadcast() [Fact] public async Task DereplicateGoesToPrimary() { - ConfigurationOptions config = GetMasterReplicaConfig(); + ConfigurationOptions config = GetPrimaryReplicaConfig(); config.ConfigCheckSeconds = 5; using (var conn = ConnectionMultiplexer.Connect(config)) { @@ -123,8 +123,8 @@ public async Task DereplicateGoesToPrimary() primary.Ping(); secondary.Ping(); - primary.MakeMaster(ReplicationChangeOptions.SetTiebreaker); - secondary.MakeMaster(ReplicationChangeOptions.None); + await primary.MakePrimaryAsync(ReplicationChangeOptions.SetTiebreaker); + await secondary.MakePrimaryAsync(ReplicationChangeOptions.None); await Task.Delay(100).ConfigureAwait(false); @@ -150,22 +150,22 @@ public async Task DereplicateGoesToPrimary() var ex = Assert.Throws(() => db.IdentifyEndpoint(key, CommandFlags.DemandReplica)); Assert.StartsWith("No connection is active/available to service this operation: EXISTS " + Me(), ex.Message); - Writer.WriteLine("Invoking MakeMaster()..."); - primary.MakeMaster(ReplicationChangeOptions.Broadcast | ReplicationChangeOptions.ReplicateToOtherEndpoints | ReplicationChangeOptions.SetTiebreaker, Writer); - Writer.WriteLine("Finished MakeMaster() call."); + Writer.WriteLine("Invoking MakePrimaryAsync()..."); + await primary.MakePrimaryAsync(ReplicationChangeOptions.Broadcast | ReplicationChangeOptions.ReplicateToOtherEndpoints | ReplicationChangeOptions.SetTiebreaker, Writer); + Writer.WriteLine("Finished MakePrimaryAsync() call."); await Task.Delay(100).ConfigureAwait(false); - Writer.WriteLine("Invoking Ping() (post-master)"); + Writer.WriteLine("Invoking Ping() (post-primary)"); primary.Ping(); secondary.Ping(); - Writer.WriteLine("Finished Ping() (post-master)"); + Writer.WriteLine("Finished Ping() (post-primary)"); Assert.True(primary.IsConnected, $"{primary.EndPoint} is not connected."); Assert.True(secondary.IsConnected, $"{secondary.EndPoint} is not connected."); - Writer.WriteLine($"{primary.EndPoint}: {primary.ServerType}, Mode: {(primary.IsReplica ? "Replica" : "Master")}"); - Writer.WriteLine($"{secondary.EndPoint}: {secondary.ServerType}, Mode: {(secondary.IsReplica ? "Replica" : "Master")}"); + Writer.WriteLine($"{primary.EndPoint}: {primary.ServerType}, Mode: {(primary.IsReplica ? "Replica" : "Primary")}"); + Writer.WriteLine($"{secondary.EndPoint}: {secondary.ServerType}, Mode: {(secondary.IsReplica ? "Replica" : "Primary")}"); // Create a separate multiplexer with a valid view of the world to distinguish between failures of // server topology changes from failures to recognize those changes @@ -175,10 +175,10 @@ public async Task DereplicateGoesToPrimary() var primary2 = conn2.GetServer(TestConfig.Current.FailoverMasterServerAndPort); var secondary2 = conn2.GetServer(TestConfig.Current.FailoverReplicaServerAndPort); - Writer.WriteLine($"Check: {primary2.EndPoint}: {primary2.ServerType}, Mode: {(primary2.IsReplica ? "Replica" : "Master")}"); - Writer.WriteLine($"Check: {secondary2.EndPoint}: {secondary2.ServerType}, Mode: {(secondary2.IsReplica ? "Replica" : "Master")}"); + Writer.WriteLine($"Check: {primary2.EndPoint}: {primary2.ServerType}, Mode: {(primary2.IsReplica ? "Replica" : "Primary")}"); + Writer.WriteLine($"Check: {secondary2.EndPoint}: {secondary2.ServerType}, Mode: {(secondary2.IsReplica ? "Replica" : "Primary")}"); - Assert.False(primary2.IsReplica, $"{primary2.EndPoint} should be a master (verification connection)."); + Assert.False(primary2.IsReplica, $"{primary2.EndPoint} should be a primary (verification connection)."); Assert.True(secondary2.IsReplica, $"{secondary2.EndPoint} should be a replica (verification connection)."); var db2 = conn2.GetDatabase(); @@ -191,7 +191,7 @@ public async Task DereplicateGoesToPrimary() await UntilCondition(TimeSpan.FromSeconds(20), () => !primary.IsReplica && secondary.IsReplica); - Assert.False(primary.IsReplica, $"{primary.EndPoint} should be a master."); + Assert.False(primary.IsReplica, $"{primary.EndPoint} should be a primary."); Assert.True(secondary.IsReplica, $"{secondary.EndPoint} should be a replica."); Assert.Equal(primary.EndPoint, db.IdentifyEndpoint(key, CommandFlags.PreferMaster)); @@ -203,9 +203,9 @@ public async Task DereplicateGoesToPrimary() #if DEBUG [Fact] - public async Task SubscriptionsSurviveMasterSwitchAsync() + public async Task SubscriptionsSurvivePrimarySwitchAsync() { - static void TopologyFail() => Skip.Inconclusive("Replication tolopogy change failed...and that's both inconsistent and not what we're testing."); + static void TopologyFail() => Skip.Inconclusive("Replication topology change failed...and that's both inconsistent and not what we're testing."); if (RunningInCI) { @@ -220,14 +220,14 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() var subA = a.GetSubscriber(); var subB = b.GetSubscriber(); - long masterChanged = 0, aCount = 0, bCount = 0; + long primaryChanged = 0, aCount = 0, bCount = 0; a.ConfigurationChangedBroadcast += delegate { - Log("A noticed config broadcast: " + Interlocked.Increment(ref masterChanged)); + Log("A noticed config broadcast: " + Interlocked.Increment(ref primaryChanged)); }; b.ConfigurationChangedBroadcast += delegate { - Log("B noticed config broadcast: " + Interlocked.Increment(ref masterChanged)); + Log("B noticed config broadcast: " + Interlocked.Increment(ref primaryChanged)); }; subA.Subscribe(channel, (_, message) => { @@ -263,34 +263,34 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() Assert.Equal(2, Interlocked.Read(ref aCount)); Assert.Equal(2, Interlocked.Read(ref bCount)); - Assert.Equal(0, Interlocked.Read(ref masterChanged)); + Assert.Equal(0, Interlocked.Read(ref primaryChanged)); try { - Interlocked.Exchange(ref masterChanged, 0); + Interlocked.Exchange(ref primaryChanged, 0); Interlocked.Exchange(ref aCount, 0); Interlocked.Exchange(ref bCount, 0); - Log("Changing master..."); + Log("Changing primary..."); using (var sw = new StringWriter()) { - a.GetServer(TestConfig.Current.FailoverReplicaServerAndPort).MakeMaster(ReplicationChangeOptions.All, sw); + await a.GetServer(TestConfig.Current.FailoverReplicaServerAndPort).MakePrimaryAsync(ReplicationChangeOptions.All, sw); Log(sw.ToString()); } Log("Waiting for connection B to detect..."); await UntilCondition(TimeSpan.FromSeconds(10), () => b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsReplica).ForAwait(); subA.Ping(); subB.Ping(); - Log("Falover 2 Attempted. Pausing..."); - Log(" A " + TestConfig.Current.FailoverMasterServerAndPort + " status: " + (a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsReplica ? "Replica" : "Master")); - Log(" A " + TestConfig.Current.FailoverReplicaServerAndPort + " status: " + (a.GetServer(TestConfig.Current.FailoverReplicaServerAndPort).IsReplica ? "Replica" : "Master")); - Log(" B " + TestConfig.Current.FailoverMasterServerAndPort + " status: " + (b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsReplica ? "Replica" : "Master")); - Log(" B " + TestConfig.Current.FailoverReplicaServerAndPort + " status: " + (b.GetServer(TestConfig.Current.FailoverReplicaServerAndPort).IsReplica ? "Replica" : "Master")); + Log("Failover 2 Attempted. Pausing..."); + Log(" A " + TestConfig.Current.FailoverMasterServerAndPort + " status: " + (a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsReplica ? "Replica" : "Primary")); + Log(" A " + TestConfig.Current.FailoverReplicaServerAndPort + " status: " + (a.GetServer(TestConfig.Current.FailoverReplicaServerAndPort).IsReplica ? "Replica" : "Primary")); + Log(" B " + TestConfig.Current.FailoverMasterServerAndPort + " status: " + (b.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsReplica ? "Replica" : "Primary")); + Log(" B " + TestConfig.Current.FailoverReplicaServerAndPort + " status: " + (b.GetServer(TestConfig.Current.FailoverReplicaServerAndPort).IsReplica ? "Replica" : "Primary")); if (!a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsReplica) { TopologyFail(); } - Log("Falover 2 Complete."); + Log("Failover 2 Complete."); Assert.True(a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).IsReplica, $"A Connection: {TestConfig.Current.FailoverMasterServerAndPort} should be a replica"); Assert.False(a.GetServer(TestConfig.Current.FailoverReplicaServerAndPort).IsReplica, $"A Connection: {TestConfig.Current.FailoverReplicaServerAndPort} should be a master"); @@ -334,12 +334,12 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() Log("Counts so far:"); Log(" aCount: " + Interlocked.Read(ref aCount)); Log(" bCount: " + Interlocked.Read(ref bCount)); - Log(" masterChanged: " + Interlocked.Read(ref masterChanged)); + Log(" primaryChanged: " + Interlocked.Read(ref primaryChanged)); Assert.Equal(2, Interlocked.Read(ref aCount)); Assert.Equal(2, Interlocked.Read(ref bCount)); - // Expect 10, because a sees a, but b sees a and b due to replication - Assert.Equal(10, Interlocked.CompareExchange(ref masterChanged, 0, 0)); + // Expect 12, because a sees a, but b sees a and b due to replication + Assert.Equal(12, Interlocked.CompareExchange(ref primaryChanged, 0, 0)); } catch { @@ -353,7 +353,7 @@ public async Task SubscriptionsSurviveMasterSwitchAsync() Log("Restoring configuration..."); try { - a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).MakeMaster(ReplicationChangeOptions.All); + await a.GetServer(TestConfig.Current.FailoverMasterServerAndPort).MakePrimaryAsync(ReplicationChangeOptions.All); await Task.Delay(1000).ForAwait(); } catch { /* Don't bomb here */ }