From 204d9313397ed8fe72472bddfff29fb25440b497 Mon Sep 17 00:00:00 2001 From: David Boike Date: Mon, 2 Oct 2023 14:22:06 -0500 Subject: [PATCH 01/10] Get multi-instance tests to run on Raven5 --- .../AcceptanceTest.cs | 2 +- .../DatabaseLease.cs | 28 +++++++++++ ...ntrol.MultiInstance.AcceptanceTests.csproj | 2 +- .../SharedDatabaseSetup.cs | 28 +++++++++++ .../SharedEmbeddedServer.cs | 46 +++++++++++++++++++ .../ServiceControlComponentRunner.cs | 28 +++++------ 6 files changed, 118 insertions(+), 16 deletions(-) create mode 100644 src/ServiceControl.MultiInstance.AcceptanceTests/DatabaseLease.cs create mode 100644 src/ServiceControl.MultiInstance.AcceptanceTests/SharedDatabaseSetup.cs create mode 100644 src/ServiceControl.MultiInstance.AcceptanceTests/SharedEmbeddedServer.cs diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/AcceptanceTest.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/AcceptanceTest.cs index 0a3e140f3d..f0e5d62f6d 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/AcceptanceTest.cs +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/AcceptanceTest.cs @@ -68,7 +68,7 @@ public void Setup() DataStoreConfiguration = new DataStoreConfiguration { - DataStoreTypeName = "RavenDB35" + DataStoreTypeName = "RavenDB5" }; serviceControlRunnerBehavior = new ServiceControlComponentBehavior( diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/DatabaseLease.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/DatabaseLease.cs new file mode 100644 index 0000000000..b9931e209c --- /dev/null +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/DatabaseLease.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; +using Raven.Client.ServerWide.Operations; +using ServiceBus.Management.Infrastructure.Settings; + +public class DatabaseLease : IAsyncDisposable +{ + public string DatabaseName { get; } = Guid.NewGuid().ToString("n"); + + public void CustomizeSettings(Settings settings) + { + settings.PersisterSpecificSettings = new RavenDBPersisterSettings + { + ErrorRetentionPeriod = TimeSpan.FromDays(10), + ConnectionString = SharedDatabaseSetup.SharedInstance.ServerUrl, + DatabaseName = DatabaseName + }; + } + + public async ValueTask DisposeAsync() + { + var documentStore = await SharedDatabaseSetup.SharedInstance.Connect(); + + // Comment this out temporarily to be able to inspect a database after the test has completed + var deleteDatabasesOperation = new DeleteDatabasesOperation(new DeleteDatabasesOperation.Parameters { DatabaseNames = new[] { DatabaseName }, HardDelete = true }); + await documentStore.Maintenance.Server.SendAsync(deleteDatabasesOperation); + } +} \ No newline at end of file diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/ServiceControl.MultiInstance.AcceptanceTests.csproj b/src/ServiceControl.MultiInstance.AcceptanceTests/ServiceControl.MultiInstance.AcceptanceTests.csproj index 3fe2bed049..8a16f0f87f 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/ServiceControl.MultiInstance.AcceptanceTests.csproj +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/ServiceControl.MultiInstance.AcceptanceTests.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/SharedDatabaseSetup.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/SharedDatabaseSetup.cs new file mode 100644 index 0000000000..a3399d71e8 --- /dev/null +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/SharedDatabaseSetup.cs @@ -0,0 +1,28 @@ +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using ServiceControl.Persistence.RavenDb5; + +[SetUpFixture] +public static class SharedDatabaseSetup +{ + public static EmbeddedDatabase SharedInstance { get; private set; } + + // Needs to be in a SetUpFixture otherwise the OneTimeSetUp is invoked for each inherited test fixture + [OneTimeSetUp] + public static async Task SetupSharedEmbeddedServer() + { + using (var cancellation = new CancellationTokenSource(30_000)) + { + SharedInstance = await SharedEmbeddedServer.GetInstance(cancellation.Token); + } + } + + [OneTimeTearDown] + public static void TearDown() => SharedInstance.Dispose(); + + public static DatabaseLease LeaseDatabase() + { + return new DatabaseLease(); + } +} diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/SharedEmbeddedServer.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/SharedEmbeddedServer.cs new file mode 100644 index 0000000000..463ecf894a --- /dev/null +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/SharedEmbeddedServer.cs @@ -0,0 +1,46 @@ +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Raven.Client.Documents; +using Raven.Client.ServerWide.Operations; +using ServiceControl.Persistence.RavenDb5; +using TestHelper; + +static class SharedEmbeddedServer +{ + public static async Task GetInstance(CancellationToken cancellationToken = default) + { + var basePath = Path.Combine(Path.GetTempPath(), "ServiceControlTests", "Primary.Raven5MultiInstance"); + var dbPath = Path.Combine(basePath, "DB"); + var databasesPath = Path.Combine(dbPath, "Databases"); + + var settings = new RavenDBPersisterSettings + { + DatabasePath = dbPath, + LogPath = Path.Combine(basePath, "Logs"), + LogsMode = "Operations", + ServerUrl = $"http://localhost:{PortUtility.FindAvailablePort(33334)}" + }; + + var instance = EmbeddedDatabase.Start(settings); + + // Make sure that the database is up - this blocks until the cancellation token times out + using (var docStore = await instance.Connect(cancellationToken)) + { + var cleanupDatabases = new DirectoryInfo(databasesPath) + .GetDirectories() + .Select(di => di.Name) + .Where(name => name.Length == 32) + .ToArray(); + + if (cleanupDatabases.Any()) + { + var cleanupOperation = new DeleteDatabasesOperation(new DeleteDatabasesOperation.Parameters { DatabaseNames = cleanupDatabases, HardDelete = true }); + await docStore.Maintenance.Server.SendAsync(cleanupOperation, CancellationToken.None); + } + } + + return instance; + } +} \ No newline at end of file diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index 236df6d894..835cdf0f5a 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -23,7 +23,6 @@ namespace ServiceControl.MultiInstance.AcceptanceTests.TestSupport using Particular.ServiceControl; using ServiceBus.Management.Infrastructure.Settings; using ServiceControl.Infrastructure.WebApi; - using TestHelper; class ServiceControlComponentRunner : ComponentRunner, IAcceptanceTestInfrastructureProviderMultiInstance { @@ -46,17 +45,14 @@ public async Task Initialize(RunDescriptor run) { SettingsPerInstance.Clear(); - var startPort = 33334; - - var mainInstancePort = PortUtility.FindAvailablePort(startPort); - var mainInstanceDbPort = PortUtility.FindAvailablePort(mainInstancePort + 1); - var auditInstancePort = PortUtility.FindAvailablePort(mainInstanceDbPort + 1); + var mainInstancePort = portLeases.GetPort(); + var auditInstancePort = portLeases.GetPort(); await InitializeServiceControlAudit(run.ScenarioContext, auditInstancePort); - await InitializeServiceControl(run.ScenarioContext, mainInstancePort, mainInstanceDbPort, auditInstancePort); + await InitializeServiceControl(run.ScenarioContext, mainInstancePort, auditInstancePort); } - async Task InitializeServiceControl(ScenarioContext context, int instancePort, int maintenancePort, int auditInstanceApiPort) + async Task InitializeServiceControl(ScenarioContext context, int instancePort, int auditInstanceApiPort) { var instanceName = Settings.DEFAULT_SERVICE_NAME; typeof(ScenarioContext).GetProperty("CurrentEndpoint", BindingFlags.Static | BindingFlags.NonPublic)?.SetValue(context, instanceName); @@ -66,11 +62,6 @@ async Task InitializeServiceControl(ScenarioContext context, int instancePort, i var settings = new Settings(instanceName, transportToUse.TypeName, dataStoreConfiguration.DataStoreTypeName) { Port = instancePort, - PersisterSpecificSettings = new RavenDBPersisterSettings - { - RunInMemory = true, - DatabaseMaintenancePort = maintenancePort - }, ForwardErrorMessages = false, TransportType = transportToUse.TypeName, TransportConnectionString = transportToUse.ConnectionString, @@ -120,6 +111,8 @@ async Task InitializeServiceControl(ScenarioContext context, int instancePort, i } }; + databaseLease.CustomizeSettings(settings); + customServiceControlSettings(settings); SettingsPerInstance[instanceName] = settings; @@ -250,7 +243,7 @@ async Task InitializeServiceControlAudit(ScenarioContext context, int instancePo var excludedAssemblies = new[] { Path.GetFileName(typeof(Settings).Assembly.CodeBase), // ServiceControl.exe - "ServiceControl.Persistence.RavenDb.dll", + "ServiceControl.Persistence.RavenDb5.dll", typeof(ServiceControlComponentRunner).Assembly.GetName().Name // This project }; @@ -366,6 +359,9 @@ public override async Task Stop() } } + await databaseLease.DisposeAsync(); + portLeases?.Dispose(); + hosts.Clear(); HttpClients.Clear(); portToHandler.Clear(); @@ -390,6 +386,10 @@ HttpClient HttpClientFactory() Action customServiceControlAuditSettings; Action customServiceControlSettings; + static readonly PortPool portPool = new PortPool(33335); + DatabaseLease databaseLease = SharedDatabaseSetup.LeaseDatabase(); + PortLease portLeases = portPool.GetLease(); + class ForwardingHandler : DelegatingHandler { public ForwardingHandler(Dictionary portsToHttpMessageHandlers) From d9a71f672e3506dd3522a45d7db8220f31e6b7b5 Mon Sep 17 00:00:00 2001 From: David Boike Date: Mon, 2 Oct 2023 16:32:20 -0500 Subject: [PATCH 02/10] Forward saga audit messages to audit instance & warn in ServicePulse --- ...hen_sending_saga_audit_to_main_instance.cs | 15 ++- .../EndpointReportingSagaAuditToPrimary.cs | 11 +++ ...ntReportingSagaAuditToPrimaryDefinition.cs | 18 ++++ .../SagaAudit/SagaAuditComponent.cs | 8 +- .../SagaAudit/SagaUpdatedHandler.cs | 92 +++++++++++++++++-- 5 files changed, 133 insertions(+), 11 deletions(-) create mode 100644 src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimary.cs create mode 100644 src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimaryDefinition.cs diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs index b82b347f40..bd09eb1905 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs @@ -8,6 +8,7 @@ using NServiceBus; using NServiceBus.AcceptanceTesting; using NUnit.Framework; + using ServiceControl.EventLog; using ServiceControl.SagaAudit; using TestSupport; using TestSupport.EndpointTemplates; @@ -55,10 +56,11 @@ public void ConfigTeardown() public async Task Saga_history_can_be_fetched_from_main_instance() { SagaHistory sagaHistory = null; + EventLogItem eventLog = null; var context = await Define() .WithEndpoint(b => b.When((bus, c) => bus.SendLocal(new MessageInitiatingSaga { Id = "Id" }))) - .Done(async c => + .Do("GetSagaHistory", async c => { if (!c.SagaId.HasValue) { @@ -69,9 +71,17 @@ public async Task Saga_history_can_be_fetched_from_main_instance() sagaHistory = result; return result; }) + .Do("GetEventLog", async c => + { + var result = await this.TryGetSingle("/api/eventlogitems/", e => e.EventType == nameof(EndpointReportingSagaAuditToPrimary)); + eventLog = result; + return result; + }) + .Done(c => eventLog != null) .Run(); Assert.NotNull(sagaHistory); + Assert.NotNull(eventLog); Assert.AreEqual(context.SagaId, sagaHistory.SagaId); Assert.AreEqual(typeof(SagaEndpoint.MySaga).FullName, sagaHistory.SagaType); @@ -119,9 +129,10 @@ public class MessageInitiatingSaga : ICommand } - public class MyContext : ScenarioContext + public class MyContext : ScenarioContext, ISequenceContext { public Guid? SagaId { get; set; } + public int Step { get; set; } } } } \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimary.cs b/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimary.cs new file mode 100644 index 0000000000..0403860d3e --- /dev/null +++ b/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimary.cs @@ -0,0 +1,11 @@ +namespace ServiceControl.SagaAudit +{ + using System; + using ServiceControl.Infrastructure.DomainEvents; + + public class EndpointReportingSagaAuditToPrimary : IDomainEvent + { + public string EndpointName { get; set; } + public DateTime DetectedAt { get; set; } + } +} \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimaryDefinition.cs b/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimaryDefinition.cs new file mode 100644 index 0000000000..1cee13e77b --- /dev/null +++ b/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimaryDefinition.cs @@ -0,0 +1,18 @@ +namespace ServiceControl.SagaAudit +{ + using ServiceControl.EventLog; + + class EndpointReportingSagaAuditToPrimaryDefinition : EventLogMappingDefinition + { + public EndpointReportingSagaAuditToPrimaryDefinition() + { + Severity(EventLog.Severity.Warning); + + Description(m => $"Endpoint {m.EndpointName} is configured to send saga audit data to the primary ServiceControl queue. Instead, saga audit data should be sent to the Audit Queue Name configured in the ServiceControl Audit Instance."); + + RelatesToEndpoint(m => m.EndpointName); + + RaisedAt(m => m.DetectedAt); + } + } +} \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/SagaAuditComponent.cs b/src/ServiceControl/SagaAudit/SagaAuditComponent.cs index 1380b5470f..7ca09878a0 100644 --- a/src/ServiceControl/SagaAudit/SagaAuditComponent.cs +++ b/src/ServiceControl/SagaAudit/SagaAuditComponent.cs @@ -3,13 +3,17 @@ using Microsoft.Extensions.Hosting; using Particular.ServiceControl; using ServiceBus.Management.Infrastructure.Settings; + using ServiceControl.ExternalIntegrations; class SagaAuditComponent : ServiceControlComponent { public override void Configure(Settings settings, IHostBuilder hostBuilder) { - // TODO: If this component doesn't do anything, should it even exist? - // THEORY: Remove in V5, since then there will be no audit capabilities left in the primary instance + // Forward saga audit messages and warn in ServiceControl 5, remove in 6 + hostBuilder.ConfigureServices(collection => + { + collection.AddEventLogMapping(); + }); } public override void Setup(Settings settings, IComponentInstallationContext context) diff --git a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs index 9b592618a6..9c83fac634 100644 --- a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs +++ b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs @@ -1,24 +1,102 @@ namespace ServiceControl.SagaAudit { + using System; + using System.Collections.Concurrent; + using System.Threading; using System.Threading.Tasks; using EndpointPlugin.Messages.SagaState; + using Newtonsoft.Json.Linq; using NServiceBus; - using ServiceControl.Persistence; + using ServiceControl.Connection; + using ServiceControl.Infrastructure.DomainEvents; class SagaUpdatedHandler : IHandleMessages { - public SagaUpdatedHandler(ISagaAuditDataStore store) + public SagaUpdatedHandler(IDomainEvents domainEvents, IPlatformConnectionBuilder connectionBuilder) { - this.store = store; + this.domainEvents = domainEvents; + this.connectionBuilder = connectionBuilder; } - public Task Handle(SagaUpdatedMessage message, IMessageHandlerContext context) + public async Task Handle(SagaUpdatedMessage message, IMessageHandlerContext context) { - var sagaSnapshot = SagaSnapshotFactory.Create(message); + await WarnOncePerEnpointPerDay(message); - return store.StoreSnapshot(sagaSnapshot); + if (auditQueueName is null || nextAuditQueueNameRefresh < DateTime.UtcNow) + { + await RefreshAuditQueue(); + } + + if (auditQueueName is null) + { + throw new Exception("Could not determine audit queue name to forward saga update message. The ServiceControl remote audit instance "); + } + + await context.ForwardCurrentMessageTo(auditQueueName); + } + + async Task RefreshAuditQueue() + { + await semaphore.WaitAsync(); + try + { + if (auditQueueName != null && nextAuditQueueNameRefresh > DateTime.UtcNow) + { + return; + } + + var connectionDetails = await connectionBuilder.BuildPlatformConnection(); + + if (connectionDetails.ToDictionary().TryGetValue("SagaAudit", out var sagaAuditObj) && sagaAuditObj is JObject sagaAudit) + { + auditQueueName = sagaAudit["SagaAuditQueue"].Value(); + nextAuditQueueNameRefresh = DateTime.UtcNow.AddMinutes(5); + } + } + catch (Exception) + { + } + finally + { + semaphore.Release(); + } } - readonly ISagaAuditDataStore store; + async Task WarnOncePerEnpointPerDay(SagaUpdatedMessage message) + { + if (nextWarningDates.TryGetValue(message.Endpoint, out var nextWarning) && nextWarning > DateTime.UtcNow) + { + return; + } + + await semaphore.WaitAsync(); + try + { + if (nextWarningDates.TryGetValue(message.Endpoint, out nextWarning) && nextWarning > DateTime.UtcNow) + { + return; + } + + await domainEvents.Raise(new EndpointReportingSagaAuditToPrimary + { + DetectedAt = DateTime.UtcNow, + EndpointName = message.Endpoint + }); + + nextWarningDates[message.Endpoint] = DateTime.UtcNow.AddDays(1); + } + finally + { + semaphore.Release(); + } + } + + readonly IDomainEvents domainEvents; + readonly IPlatformConnectionBuilder connectionBuilder; + + static ConcurrentDictionary nextWarningDates = new ConcurrentDictionary(); + static string auditQueueName; + static DateTime nextAuditQueueNameRefresh; + static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); } } \ No newline at end of file From 2e61a8007f7aebe2a83185ac17e0d0815e3c68af Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 3 Oct 2023 09:39:00 +0200 Subject: [PATCH 03/10] Typo + readonly --- src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs index 9c83fac634..90e27e8d45 100644 --- a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs +++ b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs @@ -20,7 +20,7 @@ public SagaUpdatedHandler(IDomainEvents domainEvents, IPlatformConnectionBuilder public async Task Handle(SagaUpdatedMessage message, IMessageHandlerContext context) { - await WarnOncePerEnpointPerDay(message); + await WarnOncePerEndpointPerDay(message); if (auditQueueName is null || nextAuditQueueNameRefresh < DateTime.UtcNow) { @@ -62,7 +62,7 @@ async Task RefreshAuditQueue() } } - async Task WarnOncePerEnpointPerDay(SagaUpdatedMessage message) + async Task WarnOncePerEndpointPerDay(SagaUpdatedMessage message) { if (nextWarningDates.TryGetValue(message.Endpoint, out var nextWarning) && nextWarning > DateTime.UtcNow) { @@ -94,9 +94,9 @@ await domainEvents.Raise(new EndpointReportingSagaAuditToPrimary readonly IDomainEvents domainEvents; readonly IPlatformConnectionBuilder connectionBuilder; - static ConcurrentDictionary nextWarningDates = new ConcurrentDictionary(); + static readonly ConcurrentDictionary nextWarningDates = new ConcurrentDictionary(); static string auditQueueName; static DateTime nextAuditQueueNameRefresh; static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); } -} \ No newline at end of file +} From 3339bd5665f62d0ad768aab7ad66b13502d9b634 Mon Sep 17 00:00:00 2001 From: David Boike Date: Tue, 3 Oct 2023 13:46:04 -0500 Subject: [PATCH 04/10] Use a custom check and an unrecoverable exception --- .../Infrastructure/NServiceBusFactory.cs | 3 + .../UnrecoverableException.cs | 15 ++++ ...hen_sending_saga_audit_to_main_instance.cs | 12 +++- .../Infrastructure/NServiceBusFactory.cs | 2 + .../Infrastructure/Settings/Settings.cs | 2 + .../EndpointReportingSagaAuditToPrimary.cs | 11 --- ...ntReportingSagaAuditToPrimaryDefinition.cs | 18 ----- .../SagaAudit/SagaAuditComponent.cs | 10 +-- .../SagaAuditDestinationCustomCheck.cs | 69 +++++++++++++++++++ .../SagaAudit/SagaUpdatedHandler.cs | 43 ++---------- 10 files changed, 111 insertions(+), 74 deletions(-) create mode 100644 src/ServiceControl.Infrastructure/UnrecoverableException.cs delete mode 100644 src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimary.cs delete mode 100644 src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimaryDefinition.cs create mode 100644 src/ServiceControl/SagaAudit/SagaAuditDestinationCustomCheck.cs diff --git a/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs b/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs index b6c6cab3d3..337d83e6c2 100644 --- a/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs +++ b/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs @@ -7,6 +7,7 @@ namespace ServiceControl.Audit.Infrastructure using Contracts.MessageFailures; using NServiceBus; using NServiceBus.Configuration.AdvancedExtensibility; + using ServiceControl.Infrastructure; using Settings; using Transports; @@ -63,6 +64,8 @@ public static void Configure(Settings.Settings settings, TransportCustomization { configuration.EnableInstallers(); } + + configuration.Recoverability().AddUnrecoverableException(); } static bool IsExternalContract(Type t) diff --git a/src/ServiceControl.Infrastructure/UnrecoverableException.cs b/src/ServiceControl.Infrastructure/UnrecoverableException.cs new file mode 100644 index 0000000000..179ee74c5b --- /dev/null +++ b/src/ServiceControl.Infrastructure/UnrecoverableException.cs @@ -0,0 +1,15 @@ +namespace ServiceControl.Infrastructure +{ + using System; + + public class UnrecoverableException : Exception + { + public UnrecoverableException(string message) : base(message) + { + } + + public UnrecoverableException(string message, Exception innerException) : base(message, innerException) + { + } + } +} diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs index bd09eb1905..1851749d38 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs @@ -58,6 +58,12 @@ public async Task Saga_history_can_be_fetched_from_main_instance() SagaHistory sagaHistory = null; EventLogItem eventLog = null; + CustomServiceControlSettings = settings => + { + settings.DisableHealthChecks = false; + settings.OverrideCustomCheckRepeatTime = TimeSpan.FromSeconds(2); + }; + var context = await Define() .WithEndpoint(b => b.When((bus, c) => bus.SendLocal(new MessageInitiatingSaga { Id = "Id" }))) .Do("GetSagaHistory", async c => @@ -73,9 +79,9 @@ public async Task Saga_history_can_be_fetched_from_main_instance() }) .Do("GetEventLog", async c => { - var result = await this.TryGetSingle("/api/eventlogitems/", e => e.EventType == nameof(EndpointReportingSagaAuditToPrimary)); - eventLog = result; - return result; + var result = await this.TryGetMany("/api/eventlogitems/"); + eventLog = result.Items.FirstOrDefault(e => e.Description.Contains("Saga Audit Destination") && e.Description.Contains("endpoints have reported saga audit data to the ServiceControl Primary instance")); + return eventLog != null; }) .Done(c => eventLog != null) .Run(); diff --git a/src/ServiceControl/Infrastructure/NServiceBusFactory.cs b/src/ServiceControl/Infrastructure/NServiceBusFactory.cs index cb1b540b46..7c572c8aab 100644 --- a/src/ServiceControl/Infrastructure/NServiceBusFactory.cs +++ b/src/ServiceControl/Infrastructure/NServiceBusFactory.cs @@ -4,6 +4,7 @@ namespace ServiceBus.Management.Infrastructure using NServiceBus; using NServiceBus.Configuration.AdvancedExtensibility; using ServiceControl.ExternalIntegrations; + using ServiceControl.Infrastructure; using ServiceControl.Infrastructure.Subscriptions; using ServiceControl.Notifications.Email; using ServiceControl.Operations; @@ -36,6 +37,7 @@ public static void Configure(Settings.Settings settings, TransportCustomization var recoverability = configuration.Recoverability(); recoverability.Immediate(c => c.NumberOfRetries(3)); recoverability.Delayed(c => c.NumberOfRetries(0)); + recoverability.AddUnrecoverableException(); configuration.SendFailedMessagesTo(transportSettings.ErrorQueue); recoverability.CustomPolicy(SendEmailNotificationHandler.RecoverabilityPolicy); diff --git a/src/ServiceControl/Infrastructure/Settings/Settings.cs b/src/ServiceControl/Infrastructure/Settings/Settings.cs index 22368d557c..f3a20ebbc9 100644 --- a/src/ServiceControl/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl/Infrastructure/Settings/Settings.cs @@ -156,6 +156,8 @@ public TimeSpan HeartbeatGracePeriod public bool DisableHealthChecks { get; set; } + public TimeSpan? OverrideCustomCheckRepeatTime { get; set; } + public bool ExposeApi { get; set; } = true; public TransportCustomization LoadTransportCustomization() diff --git a/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimary.cs b/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimary.cs deleted file mode 100644 index 0403860d3e..0000000000 --- a/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimary.cs +++ /dev/null @@ -1,11 +0,0 @@ -namespace ServiceControl.SagaAudit -{ - using System; - using ServiceControl.Infrastructure.DomainEvents; - - public class EndpointReportingSagaAuditToPrimary : IDomainEvent - { - public string EndpointName { get; set; } - public DateTime DetectedAt { get; set; } - } -} \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimaryDefinition.cs b/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimaryDefinition.cs deleted file mode 100644 index 1cee13e77b..0000000000 --- a/src/ServiceControl/SagaAudit/EndpointReportingSagaAuditToPrimaryDefinition.cs +++ /dev/null @@ -1,18 +0,0 @@ -namespace ServiceControl.SagaAudit -{ - using ServiceControl.EventLog; - - class EndpointReportingSagaAuditToPrimaryDefinition : EventLogMappingDefinition - { - public EndpointReportingSagaAuditToPrimaryDefinition() - { - Severity(EventLog.Severity.Warning); - - Description(m => $"Endpoint {m.EndpointName} is configured to send saga audit data to the primary ServiceControl queue. Instead, saga audit data should be sent to the Audit Queue Name configured in the ServiceControl Audit Instance."); - - RelatesToEndpoint(m => m.EndpointName); - - RaisedAt(m => m.DetectedAt); - } - } -} \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/SagaAuditComponent.cs b/src/ServiceControl/SagaAudit/SagaAuditComponent.cs index 7ca09878a0..410261ba6b 100644 --- a/src/ServiceControl/SagaAudit/SagaAuditComponent.cs +++ b/src/ServiceControl/SagaAudit/SagaAuditComponent.cs @@ -1,9 +1,10 @@ namespace ServiceControl.SagaAudit { + using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Particular.ServiceControl; using ServiceBus.Management.Infrastructure.Settings; - using ServiceControl.ExternalIntegrations; + using ServiceControl.CustomChecks; class SagaAuditComponent : ServiceControlComponent { @@ -12,12 +13,11 @@ public override void Configure(Settings settings, IHostBuilder hostBuilder) // Forward saga audit messages and warn in ServiceControl 5, remove in 6 hostBuilder.ConfigureServices(collection => { - collection.AddEventLogMapping(); + collection.AddCustomCheck(); + collection.AddSingleton(); }); } - public override void Setup(Settings settings, IComponentInstallationContext context) - { - } + public override void Setup(Settings settings, IComponentInstallationContext context) { } } } \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/SagaAuditDestinationCustomCheck.cs b/src/ServiceControl/SagaAudit/SagaAuditDestinationCustomCheck.cs new file mode 100644 index 0000000000..ae90fe8f11 --- /dev/null +++ b/src/ServiceControl/SagaAudit/SagaAuditDestinationCustomCheck.cs @@ -0,0 +1,69 @@ +namespace ServiceControl.SagaAudit +{ + using System; + using System.Collections.Concurrent; + using System.Linq; + using System.Threading.Tasks; + using NServiceBus.CustomChecks; + using ServiceBus.Management.Infrastructure.Settings; + + class SagaAuditDestinationCustomCheck : CustomCheck + { + readonly State stateHolder; + static readonly TimeSpan retentionTime = TimeSpan.FromHours(24); + + public SagaAuditDestinationCustomCheck(State stateHolder, Settings settings) + : base("Saga Audit Destination", "Health", settings.OverrideCustomCheckRepeatTime ?? TimeSpan.FromMinutes(15)) + { + this.stateHolder = stateHolder; + } + + public override Task PerformCheck() + { + var failedEndpoints = stateHolder.GetFailedEndpoints(); + + if (failedEndpoints.Length == 0) + { + return passResult; + } + + var message = $"In the last 24 hours, the following endpoints have reported saga audit data to the ServiceControl Primary instance. Instead, saga audit data should be sent to the Audit Queue Name configured in the ServiceControl Audit Instance. Affected endpoints: " + + string.Join(", ", failedEndpoints); + + return Task.FromResult(CheckResult.Failed(message)); + } + + static Task passResult = Task.FromResult(CheckResult.Pass); + + public class State + { + readonly ConcurrentDictionary failedEndpoints = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public void Fail(string endpointName) + { + var now = DateTime.UtcNow; + var cutoff = now - retentionTime; + + failedEndpoints[endpointName] = DateTime.UtcNow; + + foreach (var key in failedEndpoints.Keys) + { + if (failedEndpoints.TryGetValue(key, out var time) && time < cutoff) + { + failedEndpoints.TryRemove(key, out _); + } + } + } + + public string[] GetFailedEndpoints() + { + var cutoff = DateTime.UtcNow - retentionTime; + return failedEndpoints + .Where(pair => pair.Value > cutoff) + .Select(pair => pair.Key) + .OrderBy(name => name) + .ToArray(); + } + } + } +} \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs index 90e27e8d45..0c2de20578 100644 --- a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs +++ b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs @@ -1,26 +1,25 @@ namespace ServiceControl.SagaAudit { using System; - using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using EndpointPlugin.Messages.SagaState; using Newtonsoft.Json.Linq; using NServiceBus; using ServiceControl.Connection; - using ServiceControl.Infrastructure.DomainEvents; + using ServiceControl.Infrastructure; class SagaUpdatedHandler : IHandleMessages { - public SagaUpdatedHandler(IDomainEvents domainEvents, IPlatformConnectionBuilder connectionBuilder) + public SagaUpdatedHandler(IPlatformConnectionBuilder connectionBuilder, SagaAuditDestinationCustomCheck.State customCheckState) { - this.domainEvents = domainEvents; this.connectionBuilder = connectionBuilder; + this.customCheckState = customCheckState; } public async Task Handle(SagaUpdatedMessage message, IMessageHandlerContext context) { - await WarnOncePerEndpointPerDay(message); + customCheckState.Fail(message.Endpoint); if (auditQueueName is null || nextAuditQueueNameRefresh < DateTime.UtcNow) { @@ -29,7 +28,7 @@ public async Task Handle(SagaUpdatedMessage message, IMessageHandlerContext cont if (auditQueueName is null) { - throw new Exception("Could not determine audit queue name to forward saga update message. The ServiceControl remote audit instance "); + throw new UnrecoverableException("Could not determine audit queue name to forward saga update message. This message can be replayed after the ServiceControl Audit remote instance is running and accessible."); } await context.ForwardCurrentMessageTo(auditQueueName); @@ -62,39 +61,9 @@ async Task RefreshAuditQueue() } } - async Task WarnOncePerEndpointPerDay(SagaUpdatedMessage message) - { - if (nextWarningDates.TryGetValue(message.Endpoint, out var nextWarning) && nextWarning > DateTime.UtcNow) - { - return; - } - - await semaphore.WaitAsync(); - try - { - if (nextWarningDates.TryGetValue(message.Endpoint, out nextWarning) && nextWarning > DateTime.UtcNow) - { - return; - } - - await domainEvents.Raise(new EndpointReportingSagaAuditToPrimary - { - DetectedAt = DateTime.UtcNow, - EndpointName = message.Endpoint - }); - - nextWarningDates[message.Endpoint] = DateTime.UtcNow.AddDays(1); - } - finally - { - semaphore.Release(); - } - } - - readonly IDomainEvents domainEvents; readonly IPlatformConnectionBuilder connectionBuilder; + readonly SagaAuditDestinationCustomCheck.State customCheckState; - static readonly ConcurrentDictionary nextWarningDates = new ConcurrentDictionary(); static string auditQueueName; static DateTime nextAuditQueueNameRefresh; static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); From 3b0ba30006e92357d6c50c741b93db4972f1bdb7 Mon Sep 17 00:00:00 2001 From: David Boike Date: Tue, 3 Oct 2023 14:27:23 -0500 Subject: [PATCH 05/10] Logging and approvals --- .../APIApprovals.CustomCheckDetails.approved.txt | 1 + .../APIApprovals.PlatformSampleSettings.approved.txt | 1 + src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs | 9 +++++++-- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt index bcb0c0f49c..f50a830f63 100644 --- a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt +++ b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt @@ -1,3 +1,4 @@ +Health: Saga Audit Destination Health: ServiceControl Primary Instance Health: ServiceControl Remotes ServiceControl Health: Error Message Ingestion diff --git a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt index 0edf138ed9..3d6634e263 100644 --- a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt +++ b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt @@ -38,5 +38,6 @@ "RemoteInstances": [], "DataSpaceRemainingThreshold": 20, "DisableHealthChecks": false, + "OverrideCustomCheckRepeatTime": null, "ExposeApi": true } \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs index 0c2de20578..7949359319 100644 --- a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs +++ b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs @@ -6,6 +6,7 @@ using EndpointPlugin.Messages.SagaState; using Newtonsoft.Json.Linq; using NServiceBus; + using NServiceBus.Logging; using ServiceControl.Connection; using ServiceControl.Infrastructure; @@ -39,7 +40,7 @@ async Task RefreshAuditQueue() await semaphore.WaitAsync(); try { - if (auditQueueName != null && nextAuditQueueNameRefresh > DateTime.UtcNow) + if (nextAuditQueueNameRefresh > DateTime.UtcNow) { return; } @@ -50,10 +51,13 @@ async Task RefreshAuditQueue() { auditQueueName = sagaAudit["SagaAuditQueue"].Value(); nextAuditQueueNameRefresh = DateTime.UtcNow.AddMinutes(5); + log.InfoFormat("Refreshed audit queue name '{0}' from ServiceControl Audit instance. Will continue to use this value for forwarding saga update messages for the next 5 minutes.", auditQueueName); } } - catch (Exception) + catch (Exception x) { + log.WarnFormat("Unable to refresh audit queue name from ServiceControl Audit instance. Will continue to check at most every 15 seconds. Exception message: {0}", x.Message); + nextAuditQueueNameRefresh = DateTime.UtcNow.AddSeconds(15); } finally { @@ -67,5 +71,6 @@ async Task RefreshAuditQueue() static string auditQueueName; static DateTime nextAuditQueueNameRefresh; static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); + static readonly ILog log = LogManager.GetLogger(); } } From 0ea8891930a9f71ee57ad217b747341e0e98b209 Mon Sep 17 00:00:00 2001 From: David Boike Date: Tue, 3 Oct 2023 14:53:10 -0500 Subject: [PATCH 06/10] Rearrange into Raven5-only implementation --- .../SagaAudit/SagaAuditDataStore.cs | 3 +- .../RavenDbPersistence.cs | 7 ++++- .../NoImplementationSagaAuditDataStore.cs | 14 --------- .../SagaAuditDeprecationDataStore.cs | 25 ++++++++++++++++ .../SagaAuditDestinationCustomCheck.cs | 5 ++-- .../SagaAudit/ISagaAuditDataStore.cs | 2 +- ...IApprovals.CustomCheckDetails.approved.txt | 1 - .../SagaAudit/SagaAuditComponent.cs | 14 ++++----- .../SagaAudit/SagaUpdatedHandler.cs | 30 +++++++++++-------- 9 files changed, 59 insertions(+), 42 deletions(-) delete mode 100644 src/ServiceControl.Persistence.RavenDb5/SagaAudit/NoImplementationSagaAuditDataStore.cs create mode 100644 src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDeprecationDataStore.cs rename src/{ServiceControl => ServiceControl.Persistence.RavenDb5}/SagaAudit/SagaAuditDestinationCustomCheck.cs (92%) diff --git a/src/ServiceControl.Persistence.RavenDb/SagaAudit/SagaAuditDataStore.cs b/src/ServiceControl.Persistence.RavenDb/SagaAudit/SagaAuditDataStore.cs index 69af8b1da6..7ccb2f91e6 100644 --- a/src/ServiceControl.Persistence.RavenDb/SagaAudit/SagaAuditDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDb/SagaAudit/SagaAuditDataStore.cs @@ -13,12 +13,13 @@ public SagaAuditDataStore(IDocumentStore store) this.store = store; } - public async Task StoreSnapshot(SagaSnapshot sagaSnapshot) + public async Task StoreSnapshot(SagaSnapshot sagaSnapshot) { using (var session = store.OpenAsyncSession()) { await session.StoreAsync(sagaSnapshot); await session.SaveChangesAsync(); + return true; } } diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs index 7bbe5b8db2..abf7bffcc9 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs @@ -14,6 +14,7 @@ using ServiceControl.Persistence.MessageRedirects; using ServiceControl.Persistence.UnitOfWork; using ServiceControl.Recoverability; + using ServiceControl.SagaAudit; class RavenDbPersistence : IPersistence { @@ -67,8 +68,12 @@ public void Configure(IServiceCollection serviceCollection) serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); - serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); + + // Forward saga audit messages and warn in ServiceControl 5, remove in 6 + serviceCollection.AddSingleton(); + serviceCollection.AddCustomCheck(); + serviceCollection.AddSingleton(); } public void ConfigureLifecycle(IServiceCollection serviceCollection) diff --git a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/NoImplementationSagaAuditDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/NoImplementationSagaAuditDataStore.cs deleted file mode 100644 index 0ee0bf02e7..0000000000 --- a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/NoImplementationSagaAuditDataStore.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace ServiceControl.Persistence.RavenDb -{ - using System; - using System.Threading.Tasks; - using ServiceControl.Persistence.Infrastructure; - using ServiceControl.SagaAudit; - - class NoImplementationSagaAuditDataStore : ISagaAuditDataStore - { - public Task StoreSnapshot(SagaSnapshot sagaSnapshot) => throw new NotImplementedException(); - - public Task> GetSagaById(Guid sagaId) => Task.FromResult(QueryResult.Empty()); - } -} diff --git a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDeprecationDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDeprecationDataStore.cs new file mode 100644 index 0000000000..b3dc3a1fad --- /dev/null +++ b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDeprecationDataStore.cs @@ -0,0 +1,25 @@ +namespace ServiceControl.Persistence.RavenDb +{ + using System; + using System.Threading.Tasks; + using ServiceControl.Persistence.Infrastructure; + using ServiceControl.SagaAudit; + + class SagaAuditDeprecationDataStore : ISagaAuditDataStore + { + public SagaAuditDeprecationDataStore(SagaAuditDestinationCustomCheck.State customCheckState) + { + this.customCheckState = customCheckState; + } + + public Task StoreSnapshot(SagaSnapshot sagaSnapshot) + { + customCheckState.Fail(sagaSnapshot.Endpoint); + return Task.FromResult(false); + } + + public Task> GetSagaById(Guid sagaId) => Task.FromResult(QueryResult.Empty()); + + readonly SagaAuditDestinationCustomCheck.State customCheckState; + } +} diff --git a/src/ServiceControl/SagaAudit/SagaAuditDestinationCustomCheck.cs b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs similarity index 92% rename from src/ServiceControl/SagaAudit/SagaAuditDestinationCustomCheck.cs rename to src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs index ae90fe8f11..9bee7d0a4f 100644 --- a/src/ServiceControl/SagaAudit/SagaAuditDestinationCustomCheck.cs +++ b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs @@ -5,15 +5,14 @@ using System.Linq; using System.Threading.Tasks; using NServiceBus.CustomChecks; - using ServiceBus.Management.Infrastructure.Settings; class SagaAuditDestinationCustomCheck : CustomCheck { readonly State stateHolder; static readonly TimeSpan retentionTime = TimeSpan.FromHours(24); - public SagaAuditDestinationCustomCheck(State stateHolder, Settings settings) - : base("Saga Audit Destination", "Health", settings.OverrideCustomCheckRepeatTime ?? TimeSpan.FromMinutes(15)) + public SagaAuditDestinationCustomCheck(State stateHolder) + : base("Saga Audit Destination", "Health", TimeSpan.FromSeconds(5)) { this.stateHolder = stateHolder; } diff --git a/src/ServiceControl.Persistence/SagaAudit/ISagaAuditDataStore.cs b/src/ServiceControl.Persistence/SagaAudit/ISagaAuditDataStore.cs index 7756340b3d..157d482b1c 100644 --- a/src/ServiceControl.Persistence/SagaAudit/ISagaAuditDataStore.cs +++ b/src/ServiceControl.Persistence/SagaAudit/ISagaAuditDataStore.cs @@ -7,7 +7,7 @@ public interface ISagaAuditDataStore { - Task StoreSnapshot(SagaSnapshot sagaSnapshot); + Task StoreSnapshot(SagaSnapshot sagaSnapshot); Task> GetSagaById(Guid sagaId); } } \ No newline at end of file diff --git a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt index f50a830f63..bcb0c0f49c 100644 --- a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt +++ b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt @@ -1,4 +1,3 @@ -Health: Saga Audit Destination Health: ServiceControl Primary Instance Health: ServiceControl Remotes ServiceControl Health: Error Message Ingestion diff --git a/src/ServiceControl/SagaAudit/SagaAuditComponent.cs b/src/ServiceControl/SagaAudit/SagaAuditComponent.cs index 410261ba6b..1380b5470f 100644 --- a/src/ServiceControl/SagaAudit/SagaAuditComponent.cs +++ b/src/ServiceControl/SagaAudit/SagaAuditComponent.cs @@ -1,23 +1,19 @@ namespace ServiceControl.SagaAudit { - using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Particular.ServiceControl; using ServiceBus.Management.Infrastructure.Settings; - using ServiceControl.CustomChecks; class SagaAuditComponent : ServiceControlComponent { public override void Configure(Settings settings, IHostBuilder hostBuilder) { - // Forward saga audit messages and warn in ServiceControl 5, remove in 6 - hostBuilder.ConfigureServices(collection => - { - collection.AddCustomCheck(); - collection.AddSingleton(); - }); + // TODO: If this component doesn't do anything, should it even exist? + // THEORY: Remove in V5, since then there will be no audit capabilities left in the primary instance } - public override void Setup(Settings settings, IComponentInstallationContext context) { } + public override void Setup(Settings settings, IComponentInstallationContext context) + { + } } } \ No newline at end of file diff --git a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs index 7949359319..0bd6139abb 100644 --- a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs +++ b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs @@ -9,30 +9,36 @@ using NServiceBus.Logging; using ServiceControl.Connection; using ServiceControl.Infrastructure; + using ServiceControl.Persistence; class SagaUpdatedHandler : IHandleMessages { - public SagaUpdatedHandler(IPlatformConnectionBuilder connectionBuilder, SagaAuditDestinationCustomCheck.State customCheckState) + public SagaUpdatedHandler(ISagaAuditDataStore sagaAuditStore, IPlatformConnectionBuilder connectionBuilder) { + this.sagaAuditStore = sagaAuditStore; this.connectionBuilder = connectionBuilder; - this.customCheckState = customCheckState; } public async Task Handle(SagaUpdatedMessage message, IMessageHandlerContext context) { - customCheckState.Fail(message.Endpoint); + var sagaSnapshot = SagaSnapshotFactory.Create(message); - if (auditQueueName is null || nextAuditQueueNameRefresh < DateTime.UtcNow) - { - await RefreshAuditQueue(); - } + var supportedByDataStore = await sagaAuditStore.StoreSnapshot(sagaSnapshot); - if (auditQueueName is null) + if (!supportedByDataStore) { - throw new UnrecoverableException("Could not determine audit queue name to forward saga update message. This message can be replayed after the ServiceControl Audit remote instance is running and accessible."); - } + if (auditQueueName is null || nextAuditQueueNameRefresh < DateTime.UtcNow) + { + await RefreshAuditQueue(); + } - await context.ForwardCurrentMessageTo(auditQueueName); + if (auditQueueName is null) + { + throw new UnrecoverableException("Could not determine audit queue name to forward saga update message. This message can be replayed after the ServiceControl Audit remote instance is running and accessible."); + } + + await context.ForwardCurrentMessageTo(auditQueueName); + } } async Task RefreshAuditQueue() @@ -65,8 +71,8 @@ async Task RefreshAuditQueue() } } + readonly ISagaAuditDataStore sagaAuditStore; readonly IPlatformConnectionBuilder connectionBuilder; - readonly SagaAuditDestinationCustomCheck.State customCheckState; static string auditQueueName; static DateTime nextAuditQueueNameRefresh; From 42638d60bb9f660d5d330b9215d4bd3e5c48223a Mon Sep 17 00:00:00 2001 From: David Boike Date: Tue, 3 Oct 2023 15:01:33 -0500 Subject: [PATCH 07/10] Persistence-based config for custom check repeat --- .../SagaAudit/When_sending_saga_audit_to_main_instance.cs | 2 +- .../SagaAudit/SagaAuditDestinationCustomCheck.cs | 4 ++-- src/ServiceControl.Persistence/PersistenceSettings.cs | 5 +++++ .../APIApprovals.PlatformSampleSettings.approved.txt | 1 - src/ServiceControl/Infrastructure/Settings/Settings.cs | 2 -- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs b/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs index 1851749d38..e6eab56c05 100644 --- a/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs +++ b/src/ServiceControl.MultiInstance.AcceptanceTests/SagaAudit/When_sending_saga_audit_to_main_instance.cs @@ -61,7 +61,7 @@ public async Task Saga_history_can_be_fetched_from_main_instance() CustomServiceControlSettings = settings => { settings.DisableHealthChecks = false; - settings.OverrideCustomCheckRepeatTime = TimeSpan.FromSeconds(2); + settings.PersisterSpecificSettings.OverrideCustomCheckRepeatTime = TimeSpan.FromSeconds(2); }; var context = await Define() diff --git a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs index 9bee7d0a4f..e0045e5ba0 100644 --- a/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs +++ b/src/ServiceControl.Persistence.RavenDb5/SagaAudit/SagaAuditDestinationCustomCheck.cs @@ -11,8 +11,8 @@ class SagaAuditDestinationCustomCheck : CustomCheck readonly State stateHolder; static readonly TimeSpan retentionTime = TimeSpan.FromHours(24); - public SagaAuditDestinationCustomCheck(State stateHolder) - : base("Saga Audit Destination", "Health", TimeSpan.FromSeconds(5)) + public SagaAuditDestinationCustomCheck(State stateHolder, RavenDBPersisterSettings settings) + : base("Saga Audit Destination", "Health", settings.OverrideCustomCheckRepeatTime ?? TimeSpan.FromMinutes(15)) { this.stateHolder = stateHolder; } diff --git a/src/ServiceControl.Persistence/PersistenceSettings.cs b/src/ServiceControl.Persistence/PersistenceSettings.cs index 94014308b0..352046ec62 100644 --- a/src/ServiceControl.Persistence/PersistenceSettings.cs +++ b/src/ServiceControl.Persistence/PersistenceSettings.cs @@ -1,5 +1,7 @@ namespace ServiceControl.Persistence { + using System; + /// /// Marker interface used to serialize persister settings in REST API /// @@ -10,5 +12,8 @@ public abstract class PersistenceSettings public string DatabasePath { get; set; } public bool EnableFullTextSearchOnBodies { get; set; } = true; + + public TimeSpan? OverrideCustomCheckRepeatTime { get; set; } + } } \ No newline at end of file diff --git a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt index 3d6634e263..0edf138ed9 100644 --- a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt +++ b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt @@ -38,6 +38,5 @@ "RemoteInstances": [], "DataSpaceRemainingThreshold": 20, "DisableHealthChecks": false, - "OverrideCustomCheckRepeatTime": null, "ExposeApi": true } \ No newline at end of file diff --git a/src/ServiceControl/Infrastructure/Settings/Settings.cs b/src/ServiceControl/Infrastructure/Settings/Settings.cs index f3a20ebbc9..22368d557c 100644 --- a/src/ServiceControl/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl/Infrastructure/Settings/Settings.cs @@ -156,8 +156,6 @@ public TimeSpan HeartbeatGracePeriod public bool DisableHealthChecks { get; set; } - public TimeSpan? OverrideCustomCheckRepeatTime { get; set; } - public bool ExposeApi { get; set; } = true; public TransportCustomization LoadTransportCustomization() From 4006ba2b4220f7164eb490f7973bc3c9e194935b Mon Sep 17 00:00:00 2001 From: David Boike Date: Tue, 3 Oct 2023 15:52:25 -0500 Subject: [PATCH 08/10] Fix custom check test --- .../ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt b/src/ServiceControl.Persistence.Tests.RavenDb5/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt index 8c907ba40d..1230037975 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/ApprovalFiles/APIApprovals.CustomCheckDetails.approved.txt @@ -1,3 +1,4 @@ +Health: Saga Audit Destination ServiceControl Health: Error Database Index Errors ServiceControl Health: Error Database Index Lag ServiceControl Health: Message Ingestion Process From 768309f1da5ba551d606e92174bd993c620d2a40 Mon Sep 17 00:00:00 2001 From: David Boike Date: Wed, 4 Oct 2023 08:01:02 -0500 Subject: [PATCH 09/10] Apply suggestions from code review Co-authored-by: Ramon Smits --- src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs index 0bd6139abb..48d8df84ba 100644 --- a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs +++ b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs @@ -53,8 +53,10 @@ async Task RefreshAuditQueue() var connectionDetails = await connectionBuilder.BuildPlatformConnection(); + // First instance is named `SagaAudit`, following instance `SagaAudit1`..`SagaAuditN` if (connectionDetails.ToDictionary().TryGetValue("SagaAudit", out var sagaAuditObj) && sagaAuditObj is JObject sagaAudit) { + // Pick any audit queue, assume all instance are based on competing consumer auditQueueName = sagaAudit["SagaAuditQueue"].Value(); nextAuditQueueNameRefresh = DateTime.UtcNow.AddMinutes(5); log.InfoFormat("Refreshed audit queue name '{0}' from ServiceControl Audit instance. Will continue to use this value for forwarding saga update messages for the next 5 minutes.", auditQueueName); From f4bfef3b8e123247d173a72bcb00b5af00da1c58 Mon Sep 17 00:00:00 2001 From: David Boike Date: Wed, 4 Oct 2023 08:13:07 -0500 Subject: [PATCH 10/10] no tabs --- src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs index 48d8df84ba..3aaa61f737 100644 --- a/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs +++ b/src/ServiceControl/SagaAudit/SagaUpdatedHandler.cs @@ -53,10 +53,10 @@ async Task RefreshAuditQueue() var connectionDetails = await connectionBuilder.BuildPlatformConnection(); - // First instance is named `SagaAudit`, following instance `SagaAudit1`..`SagaAuditN` + // First instance is named `SagaAudit`, following instance `SagaAudit1`..`SagaAuditN` if (connectionDetails.ToDictionary().TryGetValue("SagaAudit", out var sagaAuditObj) && sagaAuditObj is JObject sagaAudit) { - // Pick any audit queue, assume all instance are based on competing consumer + // Pick any audit queue, assume all instance are based on competing consumer auditQueueName = sagaAudit["SagaAuditQueue"].Value(); nextAuditQueueNameRefresh = DateTime.UtcNow.AddMinutes(5); log.InfoFormat("Refreshed audit queue name '{0}' from ServiceControl Audit instance. Will continue to use this value for forwarding saga update messages for the next 5 minutes.", auditQueueName);