diff --git a/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs b/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs index b6c6cab3d3..337d83e6c2 100644 --- a/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs +++ b/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs @@ -7,6 +7,7 @@ namespace ServiceControl.Audit.Infrastructure using Contracts.MessageFailures; using NServiceBus; using NServiceBus.Configuration.AdvancedExtensibility; + using ServiceControl.Infrastructure; using Settings; using Transports; @@ -63,6 +64,8 @@ public static void Configure(Settings.Settings settings, TransportCustomization { configuration.EnableInstallers(); } + + configuration.Recoverability().AddUnrecoverableException(); } static bool IsExternalContract(Type t) diff --git a/src/ServiceControl.Infrastructure/UnrecoverableException.cs b/src/ServiceControl.Infrastructure/UnrecoverableException.cs new file mode 100644 index 0000000000..179ee74c5b --- /dev/null +++ b/src/ServiceControl.Infrastructure/UnrecoverableException.cs @@ -0,0 +1,15 @@ +namespace ServiceControl.Infrastructure +{ + using System; + + public class UnrecoverableException : Exception + { + public UnrecoverableException(string message) : base(message) + { + } + + public UnrecoverableException(string message, Exception innerException) : base(message, innerException) + { + } + } +} diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/AcceptanceTest.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/AcceptanceTest.cs index 0a3e140f3d..f0e5d62f6d 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/AcceptanceTest.cs +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/AcceptanceTest.cs @@ -68,7 +68,7 @@ public void Setup() DataStoreConfiguration = new DataStoreConfiguration { - DataStoreTypeName = "RavenDB35" + DataStoreTypeName = "RavenDB5" }; serviceControlRunnerBehavior = new ServiceControlComponentBehavior( diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/DatabaseLease.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/DatabaseLease.cs new file mode 100644 index 0000000000..b9931e209c --- /dev/null +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/DatabaseLease.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; +using Raven.Client.ServerWide.Operations; +using ServiceBus.Management.Infrastructure.Settings; + +public class DatabaseLease : IAsyncDisposable +{ + public string DatabaseName { get; } = Guid.NewGuid().ToString("n"); + + public void CustomizeSettings(Settings settings) + { + settings.PersisterSpecificSettings = new RavenDBPersisterSettings + { + ErrorRetentionPeriod = TimeSpan.FromDays(10), + ConnectionString = SharedDatabaseSetup.SharedInstance.ServerUrl, + DatabaseName = DatabaseName + }; + } + + public async ValueTask DisposeAsync() + { + var documentStore = await SharedDatabaseSetup.SharedInstance.Connect(); + + // Comment this out temporarily to be able to inspect a database after the test has completed + var deleteDatabasesOperation = new DeleteDatabasesOperation(new DeleteDatabasesOperation.Parameters { DatabaseNames = new[] { DatabaseName }, HardDelete = true }); + await documentStore.Maintenance.Server.SendAsync(deleteDatabasesOperation); + } +} \ No newline at end of file diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs index b82b347f40..e6eab56c05 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs @@ -8,6 +8,7 @@ using NServiceBus; using NServiceBus.AcceptanceTesting; using NUnit.Framework; + using ServiceControl.EventLog; using ServiceControl.SagaAudit; using TestSupport; using TestSupport.EndpointTemplates; @@ -55,10 +56,17 @@ public void ConfigTeardown() public async Task Saga_history_can_be_fetched_from_main_instance() { SagaHistory sagaHistory = null; + EventLogItem eventLog = null; + + CustomServiceControlSettings = settings => + { + settings.DisableHealthChecks = false; + settings.PersisterSpecificSettings.OverrideCustomCheckRepeatTime = TimeSpan.FromSeconds(2); + }; var context = await Define() .WithEndpoint(b => b.When((bus, c) => bus.SendLocal(new MessageInitiatingSaga { Id = "Id" }))) - .Done(async c => + .Do("GetSagaHistory", async c => { if (!c.SagaId.HasValue) { @@ -69,9 +77,17 @@ public async Task Saga_history_can_be_fetched_from_main_instance() sagaHistory = result; return result; }) + .Do("GetEventLog", async c => + { + var result = await this.TryGetMany("/api/eventlogitems/"); + eventLog = result.Items.FirstOrDefault(e => e.Description.Contains("Saga Audit Destination") && e.Description.Contains("endpoints have reported saga audit data to the ServiceControl Primary instance")); + return eventLog != null; + }) + .Done(c => eventLog != null) .Run(); Assert.NotNull(sagaHistory); + Assert.NotNull(eventLog); Assert.AreEqual(context.SagaId, sagaHistory.SagaId); Assert.AreEqual(typeof(SagaEndpoint.MySaga).FullName, sagaHistory.SagaType); @@ -119,9 +135,10 @@ public class MessageInitiatingSaga : ICommand } - public class MyContext : ScenarioContext + public class MyContext : ScenarioContext, ISequenceContext { public Guid? SagaId { get; set; } + public int Step { get; set; } } } } \ No newline at end of file diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/ServiceControl.MultiInstance.AcceptanceTests.csproj b/src/ServiceControl.MultiInstance.AcceptanceTests/ServiceControl.MultiInstance.AcceptanceTests.csproj index 3fe2bed049..8a16f0f87f 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/ServiceControl.MultiInstance.AcceptanceTests.csproj +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/ServiceControl.MultiInstance.AcceptanceTests.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/SharedDatabaseSetup.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/SharedDatabaseSetup.cs new file mode 100644 index 0000000000..a3399d71e8 --- /dev/null +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/SharedDatabaseSetup.cs @@ -0,0 +1,28 @@ +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using ServiceControl.Persistence.RavenDb5; + +[SetUpFixture] +public static class SharedDatabaseSetup +{ + public static EmbeddedDatabase SharedInstance { get; private set; } + + // Needs to be in a SetUpFixture otherwise the OneTimeSetUp is invoked for each inherited test fixture + [OneTimeSetUp] + public static async Task SetupSharedEmbeddedServer() + { + using (var cancellation = new CancellationTokenSource(30_000)) + { + SharedInstance = await SharedEmbeddedServer.GetInstance(cancellation.Token); + } + } + + [OneTimeTearDown] + public static void TearDown() => SharedInstance.Dispose(); + + public static DatabaseLease LeaseDatabase() + { + return new DatabaseLease(); + } +} diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/SharedEmbeddedServer.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/SharedEmbeddedServer.cs new file mode 100644 index 0000000000..463ecf894a --- /dev/null +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/SharedEmbeddedServer.cs @@ -0,0 +1,46 @@ +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Raven.Client.Documents; +using Raven.Client.ServerWide.Operations; +using ServiceControl.Persistence.RavenDb5; +using TestHelper; + +static class SharedEmbeddedServer +{ + public static async Task GetInstance(CancellationToken cancellationToken = default) + { + var basePath = Path.Combine(Path.GetTempPath(), "ServiceControlTests", "Primary.Raven5MultiInstance"); + var dbPath = Path.Combine(basePath, "DB"); + var databasesPath = Path.Combine(dbPath, "Databases"); + + var settings = new RavenDBPersisterSettings + { + DatabasePath = dbPath, + LogPath = Path.Combine(basePath, "Logs"), + LogsMode = "Operations", + ServerUrl = $"http://localhost:{PortUtility.FindAvailablePort(33334)}" + }; + + var instance = EmbeddedDatabase.Start(settings); + + // Make sure that the database is up - this blocks until the cancellation token times out + using (var docStore = await instance.Connect(cancellationToken)) + { + var cleanupDatabases = new DirectoryInfo(databasesPath) + .GetDirectories() + .Select(di => di.Name) + .Where(name => name.Length == 32) + .ToArray(); + + if (cleanupDatabases.Any()) + { + var cleanupOperation = new DeleteDatabasesOperation(new DeleteDatabasesOperation.Parameters { DatabaseNames = cleanupDatabases, HardDelete = true }); + await docStore.Maintenance.Server.SendAsync(cleanupOperation, CancellationToken.None); + } + } + + return instance; + } +} \ No newline at end of file diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index 236df6d894..835cdf0f5a 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -23,7 +23,6 @@ namespace ServiceControl.MultiInstance.AcceptanceTests.TestSupport using Particular.ServiceControl; using ServiceBus.Management.Infrastructure.Settings; using ServiceControl.Infrastructure.WebApi; - using TestHelper; class ServiceControlComponentRunner : ComponentRunner, IAcceptanceTestInfrastructureProviderMultiInstance { @@ -46,17 +45,14 @@ public async Task Initialize(RunDescriptor run) { SettingsPerInstance.Clear(); - var startPort = 33334; - - var mainInstancePort = PortUtility.FindAvailablePort(startPort); - var mainInstanceDbPort = PortUtility.FindAvailablePort(mainInstancePort + 1); - var auditInstancePort = PortUtility.FindAvailablePort(mainInstanceDbPort + 1); + var mainInstancePort = portLeases.GetPort(); + var auditInstancePort = portLeases.GetPort(); await InitializeServiceControlAudit(run.ScenarioContext, auditInstancePort); - await InitializeServiceControl(run.ScenarioContext, mainInstancePort, mainInstanceDbPort, auditInstancePort); + await InitializeServiceControl(run.ScenarioContext, mainInstancePort, auditInstancePort); } - async Task InitializeServiceControl(ScenarioContext context, int instancePort, int maintenancePort, int auditInstanceApiPort) + async Task InitializeServiceControl(ScenarioContext context, int instancePort, int auditInstanceApiPort) { var instanceName = Settings.DEFAULT_SERVICE_NAME; typeof(ScenarioContext).GetProperty("CurrentEndpoint", BindingFlags.Static | BindingFlags.NonPublic)?.SetValue(context, instanceName); @@ -66,11 +62,6 @@ async Task InitializeServiceControl(ScenarioContext context, int instancePort, i var settings = new Settings(instanceName, transportToUse.TypeName, dataStoreConfiguration.DataStoreTypeName) { Port = instancePort, - PersisterSpecificSettings = new RavenDBPersisterSettings - { - RunInMemory = true, - DatabaseMaintenancePort = maintenancePort - }, ForwardErrorMessages = false, TransportType = transportToUse.TypeName, TransportConnectionString = transportToUse.ConnectionString, @@ -120,6 +111,8 @@ async Task InitializeServiceControl(ScenarioContext context, int instancePort, i } }; + databaseLease.CustomizeSettings(settings); + customServiceControlSettings(settings); SettingsPerInstance[instanceName] = settings; @@ -250,7 +243,7 @@ async Task InitializeServiceControlAudit(ScenarioContext context, int instancePo var excludedAssemblies = new[] { Path.GetFileName(typeof(Settings).Assembly.CodeBase), // ServiceControl.exe - "ServiceControl.Persistence.RavenDb.dll", + "ServiceControl.Persistence.RavenDb5.dll", typeof(ServiceControlComponentRunner).Assembly.GetName().Name // This project }; @@ -366,6 +359,9 @@ public override async Task Stop() } } + await databaseLease.DisposeAsync(); + portLeases?.Dispose(); + hosts.Clear(); HttpClients.Clear(); portToHandler.Clear(); @@ -390,6 +386,10 @@ HttpClient HttpClientFactory() Action customServiceControlAuditSettings; Action customServiceControlSettings; + static readonly PortPool portPool = new PortPool(33335); + DatabaseLease databaseLease = SharedDatabaseSetup.LeaseDatabase(); + PortLease portLeases = portPool.GetLease(); + class ForwardingHandler : DelegatingHandler { public ForwardingHandler(Dictionary portsToHttpMessageHandlers) diff --git a/src/ServiceControl.Persistence.RavenDb/SagaAudit/SagaAuditDataStore.cs b/src/ServiceControl.Persistence.RavenDb/SagaAudit/SagaAuditDataStore.cs index 69af8b1da6..7ccb2f91e6 100644 --- a/src/ServiceControl.Persistence.RavenDb/SagaAudit/SagaAuditDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDb/SagaAudit/SagaAuditDataStore.cs @@ -13,12 +13,13 @@ public SagaAuditDataStore(IDocumentStore store) this.store = store; } - public async Task StoreSnapshot(SagaSnapshot sagaSnapshot) + public async Task StoreSnapshot(SagaSnapshot sagaSnapshot) { using (var session = store.OpenAsyncSession()) { await session.StoreAsync(sagaSnapshot); await session.SaveChangesAsync(); + return true; } } diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs index 7bbe5b8db2..abf7bffcc9 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs @@ -14,6 +14,7 @@ using ServiceControl.Persistence.MessageRedirects; using ServiceControl.Persistence.UnitOfWork; using ServiceControl.Recoverability; + using ServiceControl.SagaAudit; class RavenDbPersistence : IPersistence { @@ -67,8 +68,12 @@ public void Configure(IServiceCollection serviceCollection) serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); - serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); + + // Forward saga audit messages and warn in ServiceControl 5, remove in 6 + serviceCollection.AddSingleton(); + serviceCollection.AddCustomCheck(); + serviceCollection.AddSingleton(); } public void ConfigureLifecycle(IServiceCollection serviceCollection) diff --git a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/NoImplementationSagaAuditDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/NoImplementationSagaAuditDataStore.cs deleted file mode 100644 index 0ee0bf02e7..0000000000 --- a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/NoImplementationSagaAuditDataStore.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace ServiceControl.Persistence.RavenDb -{ - using System; - using System.Threading.Tasks; - using ServiceControl.Persistence.Infrastructure; - using ServiceControl.SagaAudit; - - class NoImplementationSagaAuditDataStore : ISagaAuditDataStore - { - public Task StoreSnapshot(SagaSnapshot sagaSnapshot) => throw new NotImplementedException(); - - public Task> GetSagaById(Guid sagaId) => Task.FromResult(QueryResult.Empty()); - } -} diff --git a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDeprecationDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDeprecationDataStore.cs new file mode 100644 index 0000000000..b3dc3a1fad --- /dev/null +++ b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDeprecationDataStore.cs @@ -0,0 +1,25 @@ +namespace ServiceControl.Persistence.RavenDb +{ + using System; + using System.Threading.Tasks; + using ServiceControl.Persistence.Infrastructure; + using ServiceControl.SagaAudit; + + class SagaAuditDeprecationDataStore : ISagaAuditDataStore + { + public SagaAuditDeprecationDataStore(SagaAuditDestinationCustomCheck.State customCheckState) + { + this.customCheckState = customCheckState; + } + + public Task StoreSnapshot(SagaSnapshot sagaSnapshot) + { + customCheckState.Fail(sagaSnapshot.Endpoint); + return Task.FromResult(false); + } + + public Task> GetSagaById(Guid sagaId) => Task.FromResult(QueryResult.Empty()); + + readonly SagaAuditDestinationCustomCheck.State customCheckState; + } +} diff --git a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs new file mode 100644 index 0000000000..e0045e5ba0 --- /dev/null +++ b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs @@ -0,0 +1,68 @@ +namespace ServiceControl.SagaAudit +{ + using System; + using System.Collections.Concurrent; + using System.Linq; + using System.Threading.Tasks; + using NServiceBus.CustomChecks; + + class SagaAuditDestinationCustomCheck : CustomCheck + { + readonly State stateHolder; + static readonly TimeSpan retentionTime = TimeSpan.FromHours(24); + + public SagaAuditDestinationCustomCheck(State stateHolder, RavenDBPersisterSettings settings) + : base("Saga Audit Destination", "Health", settings.OverrideCustomCheckRepeatTime ?? TimeSpan.FromMinutes(15)) + { + this.stateHolder = stateHolder; + } + + public override Task PerformCheck() + { + var failedEndpoints = stateHolder.GetFailedEndpoints(); + + if (failedEndpoints.Length == 0) + { + return passResult; + } + + var message = $"In the last 24 hours, the following endpoints have reported saga audit data to the ServiceControl Primary instance. Instead, saga audit data should be sent to the Audit Queue Name configured in the ServiceControl Audit Instance. Affected endpoints: " + + string.Join(", ", failedEndpoints); + + return Task.FromResult(CheckResult.Failed(message)); + } + + static Task passResult = Task.FromResult(CheckResult.Pass); + + public class State + { + readonly ConcurrentDictionary failedEndpoints = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public void Fail(string endpointName) + { + var now = DateTime.UtcNow; + var cutoff = now - retentionTime; + + failedEndpoints[endpointName] = DateTime.UtcNow; + + foreach (var key in failedEndpoints.Keys) + { + if (failedEndpoints.TryGetValue(key, out var time) && time < cutoff) + { + failedEndpoints.TryRemove(key, out _); + } + } + } + + public string[] GetFailedEndpoints() + { + var cutoff = DateTime.UtcNow - retentionTime; + return failedEndpoints + .Where(pair => pair.Value > cutoff) + .Select(pair => pair.Key) + .OrderBy(name => name) + .ToArray(); + } + } + } +} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt b/src/ServiceControl.Persistence.Tests.RavenDb5/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt index 8c907ba40d..1230037975 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt @@ -1,3 +1,4 @@ +Health: Saga Audit Destination ServiceControl Health: Error Database Index Errors ServiceControl Health: Error Database Index Lag ServiceControl Health: Message Ingestion Process diff --git a/src/ServiceControl.Persistence/PersistenceSettings.cs b/src/ServiceControl.Persistence/PersistenceSettings.cs index 94014308b0..352046ec62 100644 --- a/src/ServiceControl.Persistence/PersistenceSettings.cs +++ b/src/ServiceControl.Persistence/PersistenceSettings.cs @@ -1,5 +1,7 @@ namespace ServiceControl.Persistence { + using System; + /// /// Marker interface used to serialize persister settings in REST API /// @@ -10,5 +12,8 @@ public abstract class PersistenceSettings public string DatabasePath { get; set; } public bool EnableFullTextSearchOnBodies { get; set; } = true; + + public TimeSpan? OverrideCustomCheckRepeatTime { get; set; } + } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence/SagaAudit/ISagaAuditDataStore.cs b/src/ServiceControl.Persistence/SagaAudit/ISagaAuditDataStore.cs index 7756340b3d..157d482b1c 100644 --- a/src/ServiceControl.Persistence/SagaAudit/ISagaAuditDataStore.cs +++ b/src/ServiceControl.Persistence/SagaAudit/ISagaAuditDataStore.cs @@ -7,7 +7,7 @@ public interface ISagaAuditDataStore { - Task StoreSnapshot(SagaSnapshot sagaSnapshot); + Task StoreSnapshot(SagaSnapshot sagaSnapshot); Task> GetSagaById(Guid sagaId); } } \ No newline at end of file diff --git a/src/ServiceControl/Infrastructure/NServiceBusFactory.cs b/src/ServiceControl/Infrastructure/NServiceBusFactory.cs index cb1b540b46..7c572c8aab 100644 --- a/src/ServiceControl/Infrastructure/NServiceBusFactory.cs +++ b/src/ServiceControl/Infrastructure/NServiceBusFactory.cs @@ -4,6 +4,7 @@ namespace ServiceBus.Management.Infrastructure using NServiceBus; using NServiceBus.Configuration.AdvancedExtensibility; using ServiceControl.ExternalIntegrations; + using ServiceControl.Infrastructure; using ServiceControl.Infrastructure.Subscriptions; using ServiceControl.Notifications.Email; using ServiceControl.Operations; @@ -36,6 +37,7 @@ public static void Configure(Settings.Settings settings, TransportCustomization var recoverability = configuration.Recoverability(); recoverability.Immediate(c => c.NumberOfRetries(3)); recoverability.Delayed(c => c.NumberOfRetries(0)); + recoverability.AddUnrecoverableException(); configuration.SendFailedMessagesTo(transportSettings.ErrorQueue); recoverability.CustomPolicy(SendEmailNotificationHandler.RecoverabilityPolicy); diff --git a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs index 9b592618a6..3aaa61f737 100644 --- a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs +++ b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs @@ -1,24 +1,84 @@ namespace ServiceControl.SagaAudit { + using System; + using System.Threading; using System.Threading.Tasks; using EndpointPlugin.Messages.SagaState; + using Newtonsoft.Json.Linq; using NServiceBus; + using NServiceBus.Logging; + using ServiceControl.Connection; + using ServiceControl.Infrastructure; using ServiceControl.Persistence; class SagaUpdatedHandler : IHandleMessages { - public SagaUpdatedHandler(ISagaAuditDataStore store) + public SagaUpdatedHandler(ISagaAuditDataStore sagaAuditStore, IPlatformConnectionBuilder connectionBuilder) { - this.store = store; + this.sagaAuditStore = sagaAuditStore; + this.connectionBuilder = connectionBuilder; } - public Task Handle(SagaUpdatedMessage message, IMessageHandlerContext context) + public async Task Handle(SagaUpdatedMessage message, IMessageHandlerContext context) { var sagaSnapshot = SagaSnapshotFactory.Create(message); - return store.StoreSnapshot(sagaSnapshot); + var supportedByDataStore = await sagaAuditStore.StoreSnapshot(sagaSnapshot); + + if (!supportedByDataStore) + { + if (auditQueueName is null || nextAuditQueueNameRefresh < DateTime.UtcNow) + { + await RefreshAuditQueue(); + } + + if (auditQueueName is null) + { + throw new UnrecoverableException("Could not determine audit queue name to forward saga update message. This message can be replayed after the ServiceControl Audit remote instance is running and accessible."); + } + + await context.ForwardCurrentMessageTo(auditQueueName); + } } - readonly ISagaAuditDataStore store; + async Task RefreshAuditQueue() + { + await semaphore.WaitAsync(); + try + { + if (nextAuditQueueNameRefresh > DateTime.UtcNow) + { + return; + } + + var connectionDetails = await connectionBuilder.BuildPlatformConnection(); + + // First instance is named `SagaAudit`, following instance `SagaAudit1`..`SagaAuditN` + if (connectionDetails.ToDictionary().TryGetValue("SagaAudit", out var sagaAuditObj) && sagaAuditObj is JObject sagaAudit) + { + // Pick any audit queue, assume all instance are based on competing consumer + auditQueueName = sagaAudit["SagaAuditQueue"].Value(); + nextAuditQueueNameRefresh = DateTime.UtcNow.AddMinutes(5); + log.InfoFormat("Refreshed audit queue name '{0}' from ServiceControl Audit instance. Will continue to use this value for forwarding saga update messages for the next 5 minutes.", auditQueueName); + } + } + catch (Exception x) + { + log.WarnFormat("Unable to refresh audit queue name from ServiceControl Audit instance. Will continue to check at most every 15 seconds. Exception message: {0}", x.Message); + nextAuditQueueNameRefresh = DateTime.UtcNow.AddSeconds(15); + } + finally + { + semaphore.Release(); + } + } + + readonly ISagaAuditDataStore sagaAuditStore; + readonly IPlatformConnectionBuilder connectionBuilder; + + static string auditQueueName; + static DateTime nextAuditQueueNameRefresh; + static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); + static readonly ILog log = LogManager.GetLogger(); } -} \ No newline at end of file +}