diff --git a/src/ServiceControl.AcceptanceTests.RavenDB5/Recoverability/MessageFailures/FailedErrorsController.cs b/src/ServiceControl.AcceptanceTests.RavenDB5/Recoverability/MessageFailures/FailedErrorsController.cs index 668755a855..401e3f890d 100644 --- a/src/ServiceControl.AcceptanceTests.RavenDB5/Recoverability/MessageFailures/FailedErrorsController.cs +++ b/src/ServiceControl.AcceptanceTests.RavenDB5/Recoverability/MessageFailures/FailedErrorsController.cs @@ -1,6 +1,5 @@ namespace ServiceControl.AcceptanceTests.RavenDB.Recoverability.MessageFailures { - using System; using System.Net; using System.Net.Http; using System.Threading; @@ -9,7 +8,6 @@ using Infrastructure.WebApi; using Operations; using Raven.Client.Documents; - using Raven.Client.Documents.Operations; public class FailedErrorsCountReponse { @@ -52,23 +50,6 @@ public async Task ImportFailedErrors(CancellationToken canc return Request.CreateResponse(HttpStatusCode.OK); } - [Route("failederrors/forcecleanerrors")] - [HttpPost] - public Task ForceErrorMessageCleanerRun() - { - // TODO: Is there a way to force the Raven5 expiration to happen? Or does it just happen? Won't be able to tell until we redesign that. - - // May not even need WaitForIndexes given Raven5 implementation isn't index-based - WaitForIndexes(store); - - return Task.FromResult(Request.CreateResponse(HttpStatusCode.OK)); - } - - static void WaitForIndexes(IDocumentStore store) - { - SpinWait.SpinUntil(() => store.Maintenance.Send(new GetStatisticsOperation()).StaleIndexes.Length == 0, TimeSpan.FromSeconds(10)); - } - readonly IDocumentStore store; readonly ImportFailedErrors importFailedErrors; } diff --git a/src/ServiceControl.AcceptanceTests.RavenDB5/Recoverability/MessageFailures/When_a_failed_message_is_retried.cs b/src/ServiceControl.AcceptanceTests.RavenDB5/Recoverability/MessageFailures/When_a_failed_message_is_retried.cs index 5a7a9415d5..1571f796ca 100644 --- a/src/ServiceControl.AcceptanceTests.RavenDB5/Recoverability/MessageFailures/When_a_failed_message_is_retried.cs +++ b/src/ServiceControl.AcceptanceTests.RavenDB5/Recoverability/MessageFailures/When_a_failed_message_is_retried.cs @@ -184,10 +184,11 @@ await Define() { if (ctx.Retried) { - // trigger cleanup - await this.Post("/api/failederrors/forcecleanerrors"); - + // Note: In RavenDB 3.5 there was a call to /api/failederrors/forcecleanerrors implemented in test-only FailedErrorsController + // that manually ran the Expiration bundle, but RavenDB 5 uses built-in expiration so you can't do that. The test still + // appears to pass, however. failedMessageRetries = await this.TryGet("/api/failedmessageretries/count"); + return failedMessageRetries.Count == 0; } diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_a_retry_fails_to_be_sent.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_a_retry_fails_to_be_sent.cs index 425bce9038..82a879a3e2 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_a_retry_fails_to_be_sent.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_a_retry_fails_to_be_sent.cs @@ -11,7 +11,6 @@ using NServiceBus.Settings; using NServiceBus.Transport; using NUnit.Framework; - using Operations.BodyStorage; using ServiceControl.MessageFailures; using ServiceControl.MessageFailures.Api; using ServiceControl.Persistence; @@ -26,7 +25,7 @@ public async Task SubsequentBatchesShouldBeProcessed() { FailedMessage decomissionedFailure = null, successfullyRetried = null; - CustomConfiguration = config => config.RegisterComponents(components => components.ConfigureComponent(b => new FakeReturnToSender(b.Build(), b.Build(), b.Build()), DependencyLifecycle.SingleInstance)); + CustomConfiguration = config => config.RegisterComponents(components => components.ConfigureComponent(b => new FakeReturnToSender(b.Build(), b.Build()), DependencyLifecycle.SingleInstance)); await Define() .WithEndpoint(b => b.DoNotFailOnErrorMessages() @@ -148,7 +147,7 @@ public class MessageThatWillFail : ICommand public class FakeReturnToSender : ReturnToSender { - public FakeReturnToSender(IBodyStorage bodyStorage, IErrorMessageDataStore errorMessageStore, MyContext myContext) : base(bodyStorage, errorMessageStore) + public FakeReturnToSender(IErrorMessageDataStore errorMessageStore, MyContext myContext) : base(errorMessageStore) { this.myContext = myContext; } diff --git a/src/ServiceControl.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index e661b70b1d..882e7aca69 100644 --- a/src/ServiceControl.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -1,7 +1,6 @@ namespace ServiceControl.AcceptanceTests.TestSupport { using System; - using System.Configuration; using System.IO; using System.Net.Http; using System.Net.Http.Headers; @@ -201,7 +200,7 @@ HttpClient HttpClientFactory() { if (Handler == null) { - throw new InvalidOperationException("Handler field not yet initialized"); // TODO: This method is invoked before `Initialize` completes which is strange and should be looked into as that seems like a race condition + throw new InvalidOperationException("Handler field not yet initialized"); } var httpClient = new HttpClient(Handler); httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); diff --git a/src/ServiceControl.Audit.Persistence.RavenDb5/EmbeddedDatabase.cs b/src/ServiceControl.Audit.Persistence.RavenDb5/EmbeddedDatabase.cs index 8565b0e66f..a39c7746af 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDb5/EmbeddedDatabase.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDb5/EmbeddedDatabase.cs @@ -32,17 +32,21 @@ public EmbeddedDatabase(DatabaseConfiguration configuration) return (localRavenLicense, null); } - //TODO: refactor this to extract the folder name to a constant - localRavenLicense = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Persisters", "RavenDB5", licenseFileName); + const string Persisters = "Persisters"; + const string RavenDB5 = "RavenDB5"; + + var persisterDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, Persisters, RavenDB5); + + localRavenLicense = Path.Combine(persisterDirectory, licenseFileName); if (!File.Exists(localRavenLicense)) { throw new Exception($"RavenDB license not found. Make sure the RavenDB license file, '{licenseFileName}', " + - $"is stored in the '{AppDomain.CurrentDomain.BaseDirectory}' folder or in the 'Persisters/RavenDB5' subfolder."); + $"is stored in the '{AppDomain.CurrentDomain.BaseDirectory}' folder or in the '{Persisters}/{RavenDB5}' subfolder."); } // By default RavenDB 5 searches its binaries in the RavenDBServer right below the BaseDirectory. // If we're loading from Persisters/RavenDB5 we also have to signal RavenDB where are binaries - var serverDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Persisters", "RavenDB5", "RavenDBServer"); + var serverDirectory = Path.Combine(persisterDirectory, "RavenDBServer"); return (localRavenLicense, serverDirectory); } diff --git a/src/ServiceControl.Audit.UnitTests/PersistenceManifestCanLoadFromRoot.cs b/src/ServiceControl.Audit.UnitTests/PersistenceManifestCanLoadFromRoot.cs deleted file mode 100644 index 4184c93341..0000000000 --- a/src/ServiceControl.Audit.UnitTests/PersistenceManifestCanLoadFromRoot.cs +++ /dev/null @@ -1,25 +0,0 @@ -namespace ServiceControl.Audit.UnitTests -{ - using NUnit.Framework; - using ServiceControl.Audit.Persistence; - - [TestFixture] - public class PersistenceManifestCanLoadFromRoot - { - const string persistenceName = "InMemory"; - const string persistenceType = "ServiceControl.Audit.Persistence.InMemory.InMemoryPersistenceConfiguration, ServiceControl.Audit.Persistence.InMemory"; - - // TODO: Not really sure why this test was here in the first place. Before removing Raven35 code it was testing that "RavenDB35" could be turned - // into the full persistence type name. But at that time, these UnitTests had references to both the InMemory and Raven35 persistence, and not to - // Raven5 persistence. So with the removal of Raven35 code, it couldn't work. It has been rewritten to test that the InMemory one can work, since - // that is the only persister still present in the UnitTests, but I still wonder why it's here as it seems like more of an installer testing concern. - - [Test] - public void Should_find_persistence_type_by_name() - { - var _persistenceType = PersistenceManifestLibrary.Find(persistenceName); - - Assert.AreEqual(persistenceType, _persistenceType); - } - } -} diff --git a/src/ServiceControl.Persistence.RavenDb5/DatabaseSetup.cs b/src/ServiceControl.Persistence.RavenDb5/DatabaseSetup.cs index 03840a5784..b72e9186f9 100644 --- a/src/ServiceControl.Persistence.RavenDb5/DatabaseSetup.cs +++ b/src/ServiceControl.Persistence.RavenDb5/DatabaseSetup.cs @@ -1,7 +1,5 @@ namespace ServiceControl.Persistence.RavenDb5 { - using System; - using System.Linq; using System.Threading; using System.Threading.Tasks; using Raven.Client.Documents; @@ -39,30 +37,7 @@ await documentStore.Maintenance.Server } } - var indexTypes = typeof(DatabaseSetup).Assembly.GetTypes() - .Where(t => typeof(IAbstractIndexCreationTask).IsAssignableFrom(t)) - .ToList(); - - //TODO: Handle full text search - if necessary add Where clause to query above to remove the two variants - //if (settings.EnableFullTextSearch) - //{ - // indexList.Add(new MessagesViewIndexWithFullTextSearch()); - // await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("MessagesViewIndex"), cancellationToken); - //} - //else - //{ - // indexList.Add(new MessagesViewIndex()); - // await documentStore.Maintenance - // .SendAsync(new DeleteIndexOperation("MessagesViewIndexWithFullTextSearch"), cancellationToken); - //} - - var indexList = indexTypes - .Select(t => Activator.CreateInstance(t)) - .OfType(); - - // If no full-text vs not full-text index is required, this can all be simplified using the assembly-based override - // await IndexCreation.CreateIndexesAsync(typeof(DatabaseSetup).Assembly, documentStore, null, null, cancellationToken); - await IndexCreation.CreateIndexesAsync(indexList, documentStore, null, null, cancellationToken); + await IndexCreation.CreateIndexesAsync(typeof(DatabaseSetup).Assembly, documentStore, null, null, cancellationToken); var expirationConfig = new ExpirationConfiguration { diff --git a/src/ServiceControl.Persistence.RavenDb5/EmbeddedDatabase.cs b/src/ServiceControl.Persistence.RavenDb5/EmbeddedDatabase.cs index da3b1bafd2..7cb42cb5d6 100644 --- a/src/ServiceControl.Persistence.RavenDb5/EmbeddedDatabase.cs +++ b/src/ServiceControl.Persistence.RavenDb5/EmbeddedDatabase.cs @@ -32,12 +32,16 @@ public class EmbeddedDatabase : IDisposable return (localRavenLicense, null); } - //TODO: refactor this to extract the folder name to a constant - localRavenLicense = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Persisters", "RavenDB5", licenseFileName); + const string Persisters = "Persisters"; + const string RavenDB5 = "RavenDB5"; + + var persisterDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, Persisters, RavenDB5); + + localRavenLicense = Path.Combine(persisterDirectory, licenseFileName); if (!File.Exists(localRavenLicense)) { throw new Exception($"RavenDB license not found. Make sure the RavenDB license file, '{licenseFileName}', " + - $"is stored in the '{AppDomain.CurrentDomain.BaseDirectory}' folder or in the 'Persisters/RavenDB5' subfolder."); + $"is stored in the '{AppDomain.CurrentDomain.BaseDirectory}' folder or in the '{Persisters}/{RavenDB5}' subfolder."); } // By default RavenDB 5 searches its binaries in the RavenDBServer right below the BaseDirectory. @@ -138,12 +142,6 @@ public async Task Connect(CancellationToken cancellationToken = } }; - //TODO: copied from Audit. In Audit FindClrType so I guess this is not needed. Confirm and remove - //if (configuration.FindClrType != null) - //{ - // dbOptions.Conventions.FindClrType += configuration.FindClrType; - //} - var store = await EmbeddedServer.Instance.GetDocumentStoreAsync(dbOptions, cancellationToken); var databaseSetup = new DatabaseSetup(configuration); diff --git a/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs index bf6804c5c2..0bf2444a21 100644 --- a/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs @@ -154,15 +154,6 @@ SortInfo sortInfo } } - // TODO: There seem to be several copies of this operation in here - public async Task FailedMessageFetch(string failedMessageId) - { - using (var session = documentStore.OpenAsyncSession()) - { - return await session.LoadAsync(FailedMessageIdGenerator.MakeDocumentId(failedMessageId)); - } - } - public async Task FailedMessageMarkAsArchived(string failedMessageId) { using (var session = documentStore.OpenAsyncSession()) @@ -352,20 +343,15 @@ public async Task> ErrorsSummary() } } - public async Task ErrorBy(Guid failedMessageId) - { - using (var session = documentStore.OpenAsyncSession()) - { - var message = await session.LoadAsync(FailedMessageIdGenerator.MakeDocumentId(failedMessageId.ToString())); - return message; - } - } + public Task ErrorBy(Guid failedMessageId) => ErrorByDocumentId(FailedMessageIdGenerator.MakeDocumentId(failedMessageId.ToString())); - public async Task ErrorBy(string failedMessageId) + public Task ErrorBy(string failedMessageId) => ErrorByDocumentId(FailedMessageIdGenerator.MakeDocumentId(failedMessageId)); + + async Task ErrorByDocumentId(string documentId) { using (var session = documentStore.OpenAsyncSession()) { - var message = await session.LoadAsync(FailedMessageIdGenerator.MakeDocumentId(failedMessageId)); + var message = await session.LoadAsync(documentId); return message; } } @@ -586,29 +572,33 @@ class DocumentPatchResult public string Document { get; set; } } - public async Task<(string[] ids, int count)> UnArchiveMessagesByRange(DateTime from, DateTime to, DateTime cutOff) + public async Task UnArchiveMessagesByRange(DateTime from, DateTime to) { - // TODO: Make sure this new implementation actually works, not going to delete the old implementation (commented below) until then - var patch = new PatchByQueryOperation(new IndexQuery + const int Unresolved = (int)FailedMessageStatus.Unresolved; + const int Archived = (int)FailedMessageStatus.Archived; + + var indexName = new FailedMessageViewIndex().IndexName; + var query = new IndexQuery { + // Set based args are treated differently ($name) than other places (args.name)! + // https://ravendb.net/docs/article-page/5.4/csharp/client-api/operations/patching/set-based + // Removing a property in a patch // https://ravendb.net/docs/article-page/5.4/Csharp/client-api/operations/patching/single-document#remove-property - - Query = $@"from index '{new FailedMessageViewIndex().IndexName} as msg - where msg.LastModified >= args.From and msg.LastModified <= args.To - where msg.Status == args.Archived + Query = $@"from index '{indexName}' as msg + where msg.Status == {Archived} and msg.LastModified >= $from and msg.LastModified <= $to update {{ - msg.Status = args.Unresolved - {ExpirationManager.DeleteExpirationFieldScript} + msg.Status = {Unresolved}; + {ExpirationManager.DeleteExpirationFieldExpression}; }}", - QueryParameters = + QueryParameters = new Parameters { - { "From", from }, - { "To", to }, - { "Unresolved", (int)FailedMessageStatus.Unresolved }, - { "Archived", (int)FailedMessageStatus.Archived }, + { "from", from.Ticks }, + { "to", to.Ticks } } - }, new QueryOperationOptions + }; + + var patch = new PatchByQueryOperation(query, new QueryOperationOptions { AllowStale = true, RetrieveDetails = true @@ -622,44 +612,10 @@ class DocumentPatchResult .Select(d => d.Id) .ToArray(); - // TODO: Are we *really* returning an array AND the length of the same array? - return (ids, ids.Length); - - // var options = new BulkOperationOptions - // { - // AllowStale = true - // }; - - // var result = await documentStore.AsyncDatabaseCommands.UpdateByIndexAsync( - // new FailedMessageViewIndex().IndexName, - // new IndexQuery - // { - // Query = string.Format(CultureInfo.InvariantCulture, "LastModified:[{0} TO {1}] AND Status:{2}", from.Ticks, to.Ticks, (int)FailedMessageStatus.Archived), - // Cutoff = cutOff - // }, new ScriptedPatchRequest - // { - // Script = @" - //if(this.Status === archivedStatus) { - // this.Status = unresolvedStatus; - //} - //", - // Values = - // { - // {"archivedStatus", (int)FailedMessageStatus.Archived}, - // {"unresolvedStatus", (int)FailedMessageStatus.Unresolved} - // } - // }, options); - - // var patchedDocumentIds = (await result.WaitForCompletionAsync()) - // .JsonDeserialization(); - - // return ( - // patchedDocumentIds.Select(x => FailedMessageIdGenerator.GetMessageIdFromDocumentId(x.Document)).ToArray(), - // patchedDocumentIds.Length - // ); - } - - public async Task<(string[] ids, int count)> UnArchiveMessages(IEnumerable failedMessageIds) + return ids; + } + + public async Task UnArchiveMessages(IEnumerable failedMessageIds) { Dictionary failedMessages; @@ -683,10 +639,8 @@ class DocumentPatchResult await session.SaveChangesAsync(); } - return ( - failedMessages.Values.Select(x => x.UniqueMessageId).ToArray(), // TODO: (ramon) I don't think we can use Keys here as UniqueMessageId is something different than failedMessageId right? - failedMessages.Count - ); + // Return the unique IDs - the dictionary keys are document ids with a prefix + return failedMessages.Values.Select(x => x.UniqueMessageId).ToArray(); } public async Task RevertRetry(string messageUniqueId) diff --git a/src/ServiceControl.Persistence.RavenDb5/ExpirationManager.cs b/src/ServiceControl.Persistence.RavenDb5/ExpirationManager.cs index 59ad5a6f61..ea464acaa0 100644 --- a/src/ServiceControl.Persistence.RavenDb5/ExpirationManager.cs +++ b/src/ServiceControl.Persistence.RavenDb5/ExpirationManager.cs @@ -9,7 +9,7 @@ class ExpirationManager { - public const string DeleteExpirationFieldScript = "; delete msg['@metadata']['@expires']"; + public const string DeleteExpirationFieldExpression = "delete msg['@metadata']['@expires']"; readonly TimeSpan errorRetentionPeriod; readonly TimeSpan eventsRetentionPeriod; diff --git a/src/ServiceControl.Persistence.RavenDb5/Extensions.cs b/src/ServiceControl.Persistence.RavenDb5/Extensions.cs deleted file mode 100644 index 39734a5337..0000000000 --- a/src/ServiceControl.Persistence.RavenDb5/Extensions.cs +++ /dev/null @@ -1,29 +0,0 @@ -namespace ServiceControl.Infrastructure.RavenDB -{ - using System; - using Raven.Client.Documents.Conventions; - - static class Extensions - { - //public static void Query(this DocumentDatabase db, string index, IndexQuery query, Action onItem, TState state, CancellationToken cancellationToken = default) - //{ - // var results = db.Queries.Query(index, query, cancellationToken); - // foreach (var doc in results.Results) - // { - // onItem(doc, state); - // } - //} - - // TODO: This polyfill of RavenDB 3.5 is a guess based loosely on https://github.com/ravendb/ravendb/blob/v3.5/Raven.Client.Lightweight/Document/DocumentConvention.cs#L151 - public static string DefaultFindFullDocumentKeyFromNonStringIdentifier(this DocumentConventions conventions, T id, Type collectionType, bool allowNull) - { - if (allowNull && id.Equals(default(T))) - { - return null; - } - - var collectionName = conventions.FindCollectionName(collectionType); - return $"{collectionName}{conventions.IdentityPartsSeparator}{id}"; - } - } -} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/ExternalIntegrationRequestsDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/ExternalIntegrationRequestsDataStore.cs index 0ea74e7346..c7493a5dee 100644 --- a/src/ServiceControl.Persistence.RavenDb5/ExternalIntegrationRequestsDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDb5/ExternalIntegrationRequestsDataStore.cs @@ -10,25 +10,29 @@ using Microsoft.Extensions.Hosting; using NServiceBus; using NServiceBus.Logging; - using ServiceBus.Management.Infrastructure.Extensions; using Raven.Client.Documents; using Raven.Client.Documents.Changes; + using ServiceBus.Management.Infrastructure.Extensions; class ExternalIntegrationRequestsDataStore : IExternalIntegrationRequestsDataStore , IHostedService , IAsyncDisposable { + public ExternalIntegrationRequestsDataStore(RavenDBPersisterSettings settings, IDocumentStore documentStore, CriticalError criticalError) { this.settings = settings; this.documentStore = documentStore; + var timeToWait = TimeSpan.FromMinutes(5); + var delayAfterFailure = TimeSpan.FromSeconds(20); + circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( "EventDispatcher", - TimeSpan.FromMinutes(5), // TODO: Shouldn't be magic value but coming from settings + timeToWait, ex => criticalError.Raise("Repeated failures when dispatching external integration events.", ex), - TimeSpan.FromSeconds(20) // TODO: Shouldn't be magic value but coming from settings + delayAfterFailure ); } @@ -45,7 +49,7 @@ public async Task StoreDispatchRequest(IEnumerable TryDispatchEventBatch() if (awaitingDispatching.Count == 0) { - //TODO: this should ensure we query again if the result is potentially stale - // if ☝️ is not true we will need to use/parse the ChangeVector when document is written and compare to ResultEtag + // Should ensure we query again if the result is potentially stale + // If ☝️ is not true we will need to use/parse the ChangeVector when document is written and compare to ResultEtag return stats.IsStale; } diff --git a/src/ServiceControl.Persistence.RavenDb5/FailedMessages/FailedMessageReclassifier.cs b/src/ServiceControl.Persistence.RavenDb5/FailedMessages/FailedMessageReclassifier.cs deleted file mode 100644 index ba986ed298..0000000000 --- a/src/ServiceControl.Persistence.RavenDb5/FailedMessages/FailedMessageReclassifier.cs +++ /dev/null @@ -1,148 +0,0 @@ -namespace ServiceControl.Persistence.RavenDb -{ - using System; - using System.Collections.Generic; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Extensions.Hosting; - using Newtonsoft.Json.Linq; - using NServiceBus.Logging; - using Raven.Client.Documents; - using Raven.Client.Documents.Operations; - using Raven.Client.Exceptions; - using ServiceControl.MessageFailures; - using ServiceControl.MessageFailures.Api; - using ServiceControl.Persistence.Infrastructure; - using ServiceControl.Recoverability; - - class FailedMessageReclassifier : IReclassifyFailedMessages - { - readonly IDocumentStore store; - readonly IEnumerable classifiers; - - public FailedMessageReclassifier(IDocumentStore store, IHostApplicationLifetime applicationLifetime, IEnumerable classifiers) - { - this.store = store; - this.classifiers = classifiers; - - applicationLifetime?.ApplicationStopping.Register(() => { abort = true; }); - } - - public async Task ReclassifyFailedMessages(bool force) - { - logger.Info("Reclassification of failures started."); - - var failedMessagesReclassified = 0; - var currentBatch = new List>(); - - using (var session = store.OpenAsyncSession()) - { - ReclassifyErrorSettings settings = null; - - if (!force) - { - settings = await session.LoadAsync(ReclassifyErrorSettings.IdentifierCase); - - if (settings != null && settings.ReclassificationDone) - { - logger.Info("Skipping reclassification of failures as classification has already been done."); - return 0; - } - } - - var query = session.Query() - .Where(f => f.Status == FailedMessageStatus.Unresolved); - - var totalMessagesReclassified = 0; - - await using (var stream = await session.Advanced.StreamAsync(query.OfType())) - { - while (!abort && await stream.MoveNextAsync()) - { - currentBatch.Add(Tuple.Create(stream.Current.Document.Id, new ClassifiableMessageDetails(stream.Current.Document))); - - if (currentBatch.Count == BatchSize) - { - failedMessagesReclassified += ReclassifyBatch(store, currentBatch, classifiers); - currentBatch.Clear(); - - totalMessagesReclassified += BatchSize; - logger.Info($"Reclassification of batch of {BatchSize} failed messages completed. Total messages reclassified: {totalMessagesReclassified}"); - } - } - } - - if (currentBatch.Any()) - { - ReclassifyBatch(store, currentBatch, classifiers); - } - - logger.Info($"Reclassification of failures ended. Reclassified {failedMessagesReclassified} messages"); - - settings ??= new ReclassifyErrorSettings(); - - settings.ReclassificationDone = true; - await session.StoreAsync(settings); - await session.SaveChangesAsync(); - - return failedMessagesReclassified; - } - } - - int ReclassifyBatch(IDocumentStore store, IEnumerable> docs, IEnumerable classifiers) - { - var failedMessagesReclassified = 0; - - Parallel.ForEach(docs, doc => - { - var failureGroups = GetClassificationGroups(doc.Item2, classifiers).Select(JObject.FromObject); - - try - { - store.Operations.Send(new PatchOperation(doc.Item1, null, new PatchRequest - { - Script = @"this.FailureGroups = args.Value", - Values = - { - { "Value", new JArray(failureGroups) } - } - })); - - Interlocked.Increment(ref failedMessagesReclassified); - } - catch (ConcurrencyException) - { - // Ignore concurrency exceptions - } - }); - - return failedMessagesReclassified; - } - - IEnumerable GetClassificationGroups(ClassifiableMessageDetails details, IEnumerable classifiers) - { - foreach (var classifier in classifiers) - { - var classification = classifier.ClassifyFailure(details); - if (classification == null) - { - continue; - } - - var id = DeterministicGuid.MakeId(classifier.Name, classification).ToString(); - - yield return new FailedMessage.FailureGroup - { - Id = id, - Title = classification, - Type = classifier.Name - }; - } - } - - readonly ILog logger = LogManager.GetLogger(); - const int BatchSize = 1000; - bool abort; - } -} diff --git a/src/ServiceControl.Persistence.RavenDb5/Indexes/KnownEndpointIndex.cs b/src/ServiceControl.Persistence.RavenDb5/Indexes/KnownEndpointIndex.cs index b26d85cc0f..148764224f 100644 --- a/src/ServiceControl.Persistence.RavenDb5/Indexes/KnownEndpointIndex.cs +++ b/src/ServiceControl.Persistence.RavenDb5/Indexes/KnownEndpointIndex.cs @@ -16,7 +16,6 @@ from message in messages EndpointDetails_Host = message.EndpointDetails.Host, message.HostDisplayName, message.Monitored, - message.HasTemporaryId }; } } diff --git a/src/ServiceControl.Persistence.RavenDb5/Indexes/MessagesViewIndex.cs b/src/ServiceControl.Persistence.RavenDb5/Indexes/MessagesViewIndex.cs index fda14cfa79..5d2573b3c6 100644 --- a/src/ServiceControl.Persistence.RavenDb5/Indexes/MessagesViewIndex.cs +++ b/src/ServiceControl.Persistence.RavenDb5/Indexes/MessagesViewIndex.cs @@ -7,7 +7,6 @@ namespace ServiceControl.Persistence using ServiceControl.MessageFailures; using ServiceControl.Operations; - // TODO: Consider renaming to FailedMessageIndex public class MessagesViewIndex : AbstractIndexCreationTask { public MessagesViewIndex() diff --git a/src/ServiceControl.Persistence.RavenDb5/Infrastructure/Migrations/IDataMigration.cs b/src/ServiceControl.Persistence.RavenDb5/Infrastructure/Migrations/IDataMigration.cs deleted file mode 100644 index 25fe4a0511..0000000000 --- a/src/ServiceControl.Persistence.RavenDb5/Infrastructure/Migrations/IDataMigration.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace ServiceControl.Infrastructure.RavenDB -{ - using System.Threading.Tasks; - using Raven.Client.Documents; - - interface IDataMigration - { - Task Migrate(IDocumentStore store); - } -} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/Infrastructure/Migrations/PurgeKnownEndpointsWithTemporaryIdsThatAreDuplicateDataMigration.cs b/src/ServiceControl.Persistence.RavenDb5/Infrastructure/Migrations/PurgeKnownEndpointsWithTemporaryIdsThatAreDuplicateDataMigration.cs deleted file mode 100644 index cec8ef0598..0000000000 --- a/src/ServiceControl.Persistence.RavenDb5/Infrastructure/Migrations/PurgeKnownEndpointsWithTemporaryIdsThatAreDuplicateDataMigration.cs +++ /dev/null @@ -1,38 +0,0 @@ -namespace ServiceControl.Infrastructure.RavenDB -{ - using System.Linq; - using System.Threading.Tasks; - using Raven.Client.Documents; - using Raven.Client.Documents.Commands; - using ServiceControl.Persistence; - using ServiceControl.Persistence.RavenDb; - - // TODO: I don't know if we can delete this because no prior Raven5 database will exist, or if it's an ongoing need to purge these things on every startup - class PurgeKnownEndpointsWithTemporaryIdsThatAreDuplicateDataMigration : IDataMigration - { - public Task Migrate(IDocumentStore store) - { - using (var session = store.OpenSession()) - { - var endpoints = session.Query().ToList(); - - foreach (var knownEndpoints in endpoints.GroupBy(e => e.EndpointDetails.Host + e.EndpointDetails.Name)) - { - var fixedIdsCount = knownEndpoints.Count(e => !e.HasTemporaryId); - - //If we have knowEndpoints with non temp ids, we should delete all temp ids ones. - if (fixedIdsCount > 0) - { - foreach (var key in knownEndpoints.Where(e => e.HasTemporaryId)) - { - string documentId = RavenDbMonitoringDataStore.MakeDocumentId(key.EndpointDetails.GetDeterministicId()); - session.Advanced.RequestExecutor.Execute(new DeleteDocumentCommand(documentId, null), session.Advanced.Context); - } - } - } - } - - return Task.CompletedTask; - } - } -} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs index a7d52a5026..92426b4916 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs @@ -31,6 +31,4 @@ class RavenDBPersisterSettings : PersistenceSettings public const int DatabasePortDefault = 33334; public const int ExpirationProcessTimerInSecondsDefault = 600; public const string LogsModeDefault = "Operations"; - - public override bool MessageBodiesAlwaysStoredInFailedMessage => true; } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDbExternalPersistenceLifecycle.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDbExternalPersistenceLifecycle.cs index 280f8c93ea..55be95b117 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDbExternalPersistenceLifecycle.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDbExternalPersistenceLifecycle.cs @@ -36,12 +36,6 @@ public async Task Initialize(CancellationToken cancellationToken) } }; - //TODO: copied from Audit, not sure if needed (never assigned). Check and remove - //if (settings.FindClrType != null) - //{ - // store.Conventions.FindClrType += settings.FindClrType; - //} - store.Initialize(); documentStore = store; diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs index fd20779ba7..4333ead248 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs @@ -66,7 +66,6 @@ public void Configure(IServiceCollection serviceCollection) serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); - serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); diff --git a/src/ServiceControl.Persistence.RavenDb5/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/ServiceControl.Persistence.RavenDb5/RepeatedFailuresOverTimeCircuitBreaker.cs index 09cb5b0700..288bcebf01 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RepeatedFailuresOverTimeCircuitBreaker.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RepeatedFailuresOverTimeCircuitBreaker.cs @@ -14,12 +14,14 @@ public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBe this.triggerAction = triggerAction; this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering; + Logger.DebugFormat("RepeatedFailuresOverTime circuit breaker {0} that will trigger after {1} ", name, timeToWaitBeforeTriggering, delayAfterFailure); timer = new Timer(CircuitBreakerTriggered); } public void Dispose() { - //Injected + timer.Dispose(); + GC.SuppressFinalize(this); } public void Success() @@ -63,12 +65,12 @@ void CircuitBreakerTriggered(object state) long failureCount; Exception lastException; - string name; - Timer timer; - TimeSpan timeToWaitBeforeTriggering; - Action triggerAction; + readonly string name; + readonly Timer timer; + readonly TimeSpan timeToWaitBeforeTriggering; + readonly Action triggerAction; - static TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); - static ILog Logger = LogManager.GetLogger(); + static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/RetryDocumentDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/RetryDocumentDataStore.cs index c651ba837e..ff0ee7ff85 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RetryDocumentDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RetryDocumentDataStore.cs @@ -24,9 +24,7 @@ public RetryDocumentDataStore(IDocumentStore store) this.store = store; } - public async Task StageRetryByUniqueMessageIds(string batchDocumentId, string requestId, RetryType retryType, string[] messageIds, - DateTime startTime, - DateTime? last = null, string originator = null, string batchName = null, string classifier = null) + public async Task StageRetryByUniqueMessageIds(string batchDocumentId, string[] messageIds) { var commands = new ICommandData[messageIds.Length]; @@ -89,20 +87,13 @@ await session.StoreAsync(new RetryBatch return batchDocumentId; } - public async Task>> QueryOrphanedBatches(string retrySessionId, DateTime cutoff) + public async Task>> QueryOrphanedBatches(string retrySessionId) { using (var session = store.OpenAsyncSession()) { var orphanedBatches = await session .Query() - // TODO: Cutoff no longer exists but guidance isn't clear how to handle this: - // https://ravendb.net/docs/article-page/5.4/Csharp/indexes/stale-indexes - // https://ravendb.net/docs/article-page/5.4/csharp/client-api/session/querying/how-to-customize-query#waitfornonstaleresults - - //.Customize(c => c.BeforeQueryExecuted(index => index.Cutoff = cutoff)) - .Customize(c => c.WaitForNonStaleResults()) // (ramon) I think this is valid as at start orphaned batches should be retrieved based on non-stale results I would assume? - .Where(b => b.Status == RetryBatchStatus.MarkingDocuments && b.RetrySessionId != retrySessionId) .Statistics(out var stats) .ToListAsync(); @@ -140,18 +131,8 @@ static ICommandData CreateFailedMessageRetryDocument(string batchDocumentId, str static ILog log = LogManager.GetLogger(typeof(RetryDocumentDataStore)); - // TODO: Verify Stream queries in this file, which were the result of joining overly-complex IndexBasedBulkRetryRequest - // which was in this file, as well as the FailedMessages_UniqueMessageIdAndTimeOfFailures transformer, since transformers - // are not supported in RavenDB 5. I don't know what all the other properties of IndexBasedBulkRetryRequest were ever for, - // since they weren't used in this class. I also don't know what the other comments that were in each streaming query method - // were for either. - public async Task GetBatchesForAll(DateTime cutoff, Func callback) { - // StartRetryForIndex("All", RetryType.All, DateTime.UtcNow, originator: "all messages"); - //public void StartRetryForIndex(string requestId, RetryType retryType, DateTime startTime, Expression> filter = null, string originator = null, string classifier = null) - //StartRetryForIndex(endpoint, RetryType.AllForEndpoint, DateTime.UtcNow, m => m.ReceivingEndpointName == endpoint, $"all messages for endpoint {endpoint}"); - using (var session = store.OpenAsyncSession()) { var query = session.Query() @@ -175,9 +156,6 @@ public async Task GetBatchesForAll(DateTime cutoff, Func public async Task GetBatchesForEndpoint(DateTime cutoff, string endpoint, Func callback) { - //ForIndex - //StartRetryForIndex(endpoint, RetryType.AllForEndpoint, DateTime.UtcNow, m => m.ReceivingEndpointName == endpoint, $"all messages for endpoint {endpoint}"); - using (var session = store.OpenAsyncSession()) { var query = session.Query() @@ -202,9 +180,6 @@ public async Task GetBatchesForEndpoint(DateTime cutoff, string endpoint, Func callback) { - //ForIndex - //StartRetryForIndex(failedQueueAddress, RetryType.ByQueueAddress, DateTime.UtcNow, m => m.QueueAddress == failedQueueAddress && m.Status == status, ); - using (var session = store.OpenAsyncSession()) { var query = session.Query() @@ -229,8 +204,6 @@ public async Task GetBatchesForFailedQueueAddress(DateTime cutoff, string failed public async Task GetBatchesForFailureGroup(string groupId, string groupTitle, string groupType, DateTime cutoff, Func callback) { - //retries.StartRetryForIndex(message.GroupId, RetryType.FailureGroup, started, x => x.FailureGroupId == message.GroupId, originator, group?.Type); - using (var session = store.OpenAsyncSession()) { var query = session.Query() diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/Recoverability/RetryConfirmationProcessorTests.cs b/src/ServiceControl.Persistence.Tests.RavenDb5/Recoverability/RetryConfirmationProcessorTests.cs index 76fe530aac..5ff7630cbb 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/Recoverability/RetryConfirmationProcessorTests.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/Recoverability/RetryConfirmationProcessorTests.cs @@ -35,10 +35,8 @@ await ErrorMessageDataStore.StoreFailedMessagesForTestsOnly( } ); - // TODO: Strange... I just commented these lines and the tests are still green.... - - //var retryDocumentCommands = RetryDocumentDataStore.CreateFailedMessageRetryDocument(Guid.NewGuid().ToString(), MessageId); - //await GetRequiredService().AsyncDatabaseCommands.BatchAsync(new[] { retryDocumentCommands }); + var batchDocumentId = Guid.NewGuid().ToString(); + await RetryDocumentDataStore.StageRetryByUniqueMessageIds(batchDocumentId, new[] { MessageId }); } [Test] diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/Recoverability/ReturnToSenderDequeuerTests.cs b/src/ServiceControl.Persistence.Tests.RavenDb5/Recoverability/ReturnToSenderDequeuerTests.cs index 074edd62d9..eb003c5e7b 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/Recoverability/ReturnToSenderDequeuerTests.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/Recoverability/ReturnToSenderDequeuerTests.cs @@ -2,16 +2,21 @@ { using System; using System.Collections.Generic; - using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; + using EventLog; + using MessageFailures; + using MessageFailures.Api; using Microsoft.Extensions.DependencyInjection; using NServiceBus.Extensibility; using NServiceBus.Transport; using NUnit.Framework; - using ServiceControl.Operations.BodyStorage; + using Persistence; + using Persistence.Infrastructure; + using ServiceControl.CompositeViews.Messages; + using ServiceControl.Operations; using ServiceControl.Recoverability; [TestFixture] @@ -44,7 +49,7 @@ public async Task It_removes_staging_id_header() }; var message = CreateMessage(Guid.NewGuid().ToString(), headers); - await new ReturnToSender(new FakeBodyStorage(), null).HandleMessage(message, sender, "error"); + await new ReturnToSender(null).HandleMessage(message, sender, "error"); Assert.IsFalse(sender.Message.Headers.ContainsKey("ServiceControl.Retry.StagingId")); } @@ -59,10 +64,11 @@ public async Task It_fetches_the_body_from_storage_if_provided() ["ServiceControl.Retry.StagingId"] = "SomeId", ["ServiceControl.TargetEndpointAddress"] = "TargetEndpoint", ["ServiceControl.Retry.Attempt.MessageId"] = "MessageBodyId", + ["ServiceControl.Retry.UniqueMessageId"] = "MessageBodyId" }; var message = CreateMessage(Guid.NewGuid().ToString(), headers); - await new ReturnToSender(new FakeBodyStorage(), null).HandleMessage(message, sender, "error"); + await new ReturnToSender(new FakeErrorMessageDataStore()).HandleMessage(message, sender, "error"); Assert.AreEqual("MessageBodyId", Encoding.UTF8.GetString(sender.Message.Body)); } @@ -80,7 +86,7 @@ public async Task It_uses_retry_to_if_provided() }; var message = CreateMessage(Guid.NewGuid().ToString(), headers); - await new ReturnToSender(new FakeBodyStorage(), null).HandleMessage(message, sender, "error"); + await new ReturnToSender(null).HandleMessage(message, sender, "error"); Assert.AreEqual("Proxy", sender.Destination); Assert.AreEqual("TargetEndpoint", sender.Message.Headers["ServiceControl.TargetEndpointAddress"]); @@ -98,7 +104,7 @@ public async Task It_sends_directly_to_target_if_retry_to_is_not_provided() }; var message = CreateMessage(Guid.NewGuid().ToString(), headers); - await new ReturnToSender(new FakeBodyStorage(), null).HandleMessage(message, sender, "error"); + await new ReturnToSender(null).HandleMessage(message, sender, "error"); Assert.AreEqual("TargetEndpoint", sender.Destination); Assert.IsFalse(sender.Message.Headers.ContainsKey("ServiceControl.TargetEndpointAddress")); @@ -119,7 +125,7 @@ public async Task It_restores_body_id_and_target_addres_after_failure() try { - await new ReturnToSender(new FakeBodyStorage(), null).HandleMessage(message, sender, "error"); + await new ReturnToSender(null).HandleMessage(message, sender, "error"); } catch (Exception) { @@ -153,22 +159,81 @@ public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction } } - class FakeBodyStorage : IBodyStorage + class FakeErrorMessageDataStore : IErrorMessageDataStore { - public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream) - { + public Task FetchFromFailedMessage(string bodyId) => Task.FromResult(Encoding.UTF8.GetBytes(bodyId)); + + public Task>> GetAllMessages(PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages) => throw new NotImplementedException(); + + public Task>> GetAllMessagesForEndpoint(string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, + bool includeSystemMessages) => throw new NotImplementedException(); - } - public Task TryFetch(string bodyId) - { - var stream = new MemoryStream(Encoding.UTF8.GetBytes(bodyId)); //Echo back the body ID. - return Task.FromResult(new MessageBodyStreamResult - { - HasResult = true, - Stream = stream - }); - } + public Task>> GetAllMessagesByConversation(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, + bool includeSystemMessages) => + throw new NotImplementedException(); + + public Task>> GetAllMessagesForSearch(string searchTerms, PagingInfo pagingInfo, SortInfo sortInfo) => throw new NotImplementedException(); + + public Task FailedMessageMarkAsArchived(string failedMessageId) => throw new NotImplementedException(); + + public Task FailedMessagesFetch(Guid[] ids) => throw new NotImplementedException(); + + public Task StoreFailedErrorImport(FailedErrorImport failure) => throw new NotImplementedException(); + + public Task CreateEditFailedMessageManager() => throw new NotImplementedException(); + + public Task> GetFailureGroupView(string groupId, string status, string modified) => throw new NotImplementedException(); + + public Task> GetFailureGroupsByClassifier(string classifier) => throw new NotImplementedException(); + + public Task>> ErrorGet(string status, string modified, string queueAddress, PagingInfo pagingInfo, SortInfo sortInfo) => throw new NotImplementedException(); + + public Task ErrorsHead(string status, string modified, string queueAddress) => throw new NotImplementedException(); + + public Task>> ErrorsByEndpointName(string status, string endpointName, string modified, PagingInfo pagingInfo, + SortInfo sortInfo) => + throw new NotImplementedException(); + + public Task> ErrorsSummary() => throw new NotImplementedException(); + + public Task ErrorBy(Guid failedMessageId) => throw new NotImplementedException(); + + public Task ErrorLastBy(Guid failedMessageId) => throw new NotImplementedException(); + + public Task ErrorBy(string failedMessageId) => throw new NotImplementedException(); + + public Task CreateNotificationsManager() => throw new NotImplementedException(); + + public Task EditComment(string groupId, string comment) => throw new NotImplementedException(); + + public Task DeleteComment(string groupId) => throw new NotImplementedException(); + + public Task>> GetGroupErrors(string groupId, string status, string modified, SortInfo sortInfo, PagingInfo pagingInfo) => throw new NotImplementedException(); + + public Task GetGroupErrorsCount(string groupId, string status, string modified) => throw new NotImplementedException(); + + public Task>> GetGroup(string groupId, string status, string modified) => throw new NotImplementedException(); + + public Task MarkMessageAsResolved(string failedMessageId) => throw new NotImplementedException(); + + public Task ProcessPendingRetries(DateTime periodFrom, DateTime periodTo, string queueAddress, Func processCallback) => throw new NotImplementedException(); + + public Task UnArchiveMessagesByRange(DateTime from, DateTime to) => throw new NotImplementedException(); + + public Task UnArchiveMessages(IEnumerable failedMessageIds) => throw new NotImplementedException(); + + public Task RevertRetry(string messageUniqueId) => throw new NotImplementedException(); + + public Task RemoveFailedMessageRetryDocument(string uniqueMessageId) => throw new NotImplementedException(); + + public Task GetRetryPendingMessages(DateTime from, DateTime to, string queueAddress) => throw new NotImplementedException(); + + public Task StoreEventLogItem(EventLogItem logItem) => throw new NotImplementedException(); + + public Task>> SearchEndpointMessages(string endpointName, string searchKeyword, PagingInfo pagingInfo, SortInfo sortInfo) => throw new NotImplementedException(); + + public Task StoreFailedMessagesForTestsOnly(params FailedMessage[] failedMessages) => throw new NotImplementedException(); } } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs index ab70fcdf1c..637e342f05 100644 --- a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs +++ b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs @@ -55,7 +55,7 @@ public async Task Should_discard_edit_if_edited_message_not_unresolved(FailedMes var message = CreateEditMessage(failedMessageId); await handler.Handle(message, new TestableMessageHandlerContext()); - var failedMessage = await ErrorMessageDataStore.FailedMessageFetch(failedMessageId); + var failedMessage = await ErrorMessageDataStore.ErrorBy(failedMessageId); var editFailedMessagesManager = await ErrorMessageDataStore.CreateEditFailedMessageManager(); var editOperation = await editFailedMessagesManager.GetCurrentEditingMessageId(failedMessageId); diff --git a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs index 08e49a6cc5..47edcf2ccc 100644 --- a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs +++ b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs @@ -59,7 +59,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte await CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, "Test-group", true, 2001); var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager); // Needs index RetryBatches_ByStatus_ReduceInitialBatchSize CompleteDatabaseOperation(); @@ -73,7 +73,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte await documentManager.RebuildRetryOperationState(); - processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); + processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager); await processor.ProcessBatches(sender); @@ -91,8 +91,8 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed() var sender = new TestSender(); - var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, PersistenceSettings); + var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager); await processor.ProcessBatches(sender); // mark ready await processor.ProcessBatches(sender); @@ -121,8 +121,8 @@ public async Task When_there_is_one_poison_message_it_is_removed_from_batch_and_ } }; - var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, PersistenceSettings); + var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager); bool c; do @@ -155,11 +155,11 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_ await CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, "Test-group", true, 1001); - var returnToSender = new ReturnToSender(BodyStorage, ErrorStore); + var returnToSender = new ReturnToSender(ErrorStore); var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager); CompleteDatabaseOperation(); diff --git a/src/ServiceControl.Persistence/ExternalIntegrations/ExternalIntegrationDispatchRequest.cs b/src/ServiceControl.Persistence/ExternalIntegrations/ExternalIntegrationDispatchRequest.cs index fc7ce898e9..14a15dafa0 100644 --- a/src/ServiceControl.Persistence/ExternalIntegrations/ExternalIntegrationDispatchRequest.cs +++ b/src/ServiceControl.Persistence/ExternalIntegrations/ExternalIntegrationDispatchRequest.cs @@ -3,6 +3,6 @@ namespace ServiceControl.ExternalIntegrations public class ExternalIntegrationDispatchRequest { public string Id { get; set; } - public object DispatchContext; // TODO: This is of type object, do we want the persister API to explicitly do something with this? Maybe instead of object use the specific types? Alternatively already have SC do serialization on this field so the storage engine does not need to? + public object DispatchContext; } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence/IErrorMessageDatastore.cs b/src/ServiceControl.Persistence/IErrorMessageDatastore.cs index 865d19e208..8892531973 100644 --- a/src/ServiceControl.Persistence/IErrorMessageDatastore.cs +++ b/src/ServiceControl.Persistence/IErrorMessageDatastore.cs @@ -17,7 +17,6 @@ public interface IErrorMessageDataStore Task>> GetAllMessagesForEndpoint(string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages); Task>> GetAllMessagesByConversation(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages); Task>> GetAllMessagesForSearch(string searchTerms, PagingInfo pagingInfo, SortInfo sortInfo); - Task FailedMessageFetch(string failedMessageId); Task FailedMessageMarkAsArchived(string failedMessageId); Task FailedMessagesFetch(Guid[] ids); Task StoreFailedErrorImport(FailedErrorImport failure); @@ -29,7 +28,7 @@ public interface IErrorMessageDataStore Task>> ErrorGet(string status, string modified, string queueAddress, PagingInfo pagingInfo, SortInfo sortInfo); Task ErrorsHead(string status, string modified, string queueAddress); Task>> ErrorsByEndpointName(string status, string endpointName, string modified, PagingInfo pagingInfo, SortInfo sortInfo); - Task> ErrorsSummary(); // TODO: Must not be object + Task> ErrorsSummary(); // GetErrorByIdController Task ErrorBy(Guid failedMessageId); @@ -44,7 +43,6 @@ public interface IErrorMessageDataStore // FailureGroupsController Task EditComment(string groupId, string comment); Task DeleteComment(string groupId); - // Task GetAllGroups([FromUri] string classifierFilter = null, string classifier = "Exception Type and Stack Trace"); TODO: Analyze what to do with the `GroupFetcher` dependency Task>> GetGroupErrors(string groupId, string status, string modified, SortInfo sortInfo, PagingInfo pagingInfo); Task GetGroupErrorsCount(string groupId, string status, string modified); @@ -54,13 +52,13 @@ public interface IErrorMessageDataStore Task MarkMessageAsResolved(string failedMessageId); // MessageFailureResolvedHandler - Task ProcessPendingRetries(DateTime periodFrom, DateTime periodTo, string queueAddress, Func processCallback); // TODO: Passing a callback is there to not change behavior of original implementation. + Task ProcessPendingRetries(DateTime periodFrom, DateTime periodTo, string queueAddress, Func processCallback); // UnArchiveMessagesByRangeHandler - Task<(string[] ids, int count)> UnArchiveMessagesByRange(DateTime from, DateTime to, DateTime cutOff); + Task UnArchiveMessagesByRange(DateTime from, DateTime to); // UnArchiveMessagesHandler - Task<(string[] ids, int count)> UnArchiveMessages(IEnumerable failedMessageIds); + Task UnArchiveMessages(IEnumerable failedMessageIds); // ReturnToSenderDequeuer.CaptureIfMessageSendingFails Task RevertRetry(string messageUniqueId); diff --git a/src/ServiceControl.Persistence/IRetryDocumentDataStore.cs b/src/ServiceControl.Persistence/IRetryDocumentDataStore.cs index 3562304783..0071812cd7 100644 --- a/src/ServiceControl.Persistence/IRetryDocumentDataStore.cs +++ b/src/ServiceControl.Persistence/IRetryDocumentDataStore.cs @@ -1,17 +1,15 @@ namespace ServiceControl.Persistence { - using System.Threading.Tasks; using System; using System.Collections.Generic; + using System.Threading.Tasks; using Infrastructure; using ServiceControl.MessageFailures; using ServiceControl.Recoverability; public interface IRetryDocumentDataStore { - Task StageRetryByUniqueMessageIds(string batchDocumentId, string requestId, RetryType retryType, string[] messageIds, - DateTime startTime, DateTime? last = null, string originator = null, string batchName = null, - string classifier = null); + Task StageRetryByUniqueMessageIds(string batchDocumentId, string[] messageIds); Task MoveBatchToStaging(string batchDocumentId); @@ -19,7 +17,7 @@ Task CreateBatchDocument(string retrySessionId, string requestId, RetryT string[] failedMessageRetryIds, string originator, DateTime startTime, DateTime? last = null, string batchName = null, string classifier = null); - Task>> QueryOrphanedBatches(string retrySessionId, DateTime cutoff); + Task>> QueryOrphanedBatches(string retrySessionId); Task> QueryAvailableBatches(); // RetriesGateway diff --git a/src/ServiceControl.Persistence/KnownEndpoint.cs b/src/ServiceControl.Persistence/KnownEndpoint.cs index 15227f7ea2..1fd5f16ae7 100644 --- a/src/ServiceControl.Persistence/KnownEndpoint.cs +++ b/src/ServiceControl.Persistence/KnownEndpoint.cs @@ -7,7 +7,6 @@ public class KnownEndpoint public string HostDisplayName { get; set; } public bool Monitored { get; set; } public EndpointDetails EndpointDetails { get; set; } - public bool HasTemporaryId { get; set; } public const string CollectionName = "KnownEndpoints"; } diff --git a/src/ServiceControl.Persistence/PersistenceSettings.cs b/src/ServiceControl.Persistence/PersistenceSettings.cs index ab621c7a5c..d0475801bb 100644 --- a/src/ServiceControl.Persistence/PersistenceSettings.cs +++ b/src/ServiceControl.Persistence/PersistenceSettings.cs @@ -14,7 +14,5 @@ public abstract class PersistenceSettings public bool EnableFullTextSearchOnBodies { get; set; } = true; public TimeSpan? OverrideCustomCheckRepeatTime { get; set; } - - public abstract bool MessageBodiesAlwaysStoredInFailedMessage { get; } } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence/Recoverability/IReclassifyFailedMessages.cs b/src/ServiceControl.Persistence/Recoverability/IReclassifyFailedMessages.cs deleted file mode 100644 index 7366b4837d..0000000000 --- a/src/ServiceControl.Persistence/Recoverability/IReclassifyFailedMessages.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace ServiceControl.Persistence -{ - using System.Threading.Tasks; - - /// - /// Only RavenDB 3.5 should need a working implementation of this interface because reclassification is only valid - /// for older data stored in previous versions of ServiceControl. Newer instances that start with other persistence - /// should never need to do this, and can be implemented as a no-op. - /// - public interface IReclassifyFailedMessages - { - Task ReclassifyFailedMessages(bool force); - } -} diff --git a/src/ServiceControl.Persistence/RetryBatchGroup.cs b/src/ServiceControl.Persistence/RetryBatchGroup.cs index cb53629017..01dee5b1e3 100644 --- a/src/ServiceControl.Persistence/RetryBatchGroup.cs +++ b/src/ServiceControl.Persistence/RetryBatchGroup.cs @@ -6,9 +6,6 @@ public class RetryBatchGroup { public string RequestId { get; set; } - // [Raven.Imports.Newtonsoft.Json.JsonProperty(NullValueHandling = NullValueHandling.Ignore)] //default to RetryType.Unknown for backwards compatability - // TODO: Need to fix ethe JsonProperty, maybe RavenDB has a method to specify metatdata or use a mapper/transformation - // THEORY: RetryType.Unknown is value 0 so it should default to that anyway public RetryType RetryType { get; set; } public bool HasStagingBatches { get; set; } diff --git a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.HttpApiRoutes.approved.txt b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.HttpApiRoutes.approved.txt index ea1023f850..4b293bcce8 100644 --- a/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.HttpApiRoutes.approved.txt +++ b/src/ServiceControl.UnitTests/ApprovalFiles/APIApprovals.HttpApiRoutes.approved.txt @@ -62,7 +62,6 @@ POST /recoverability/groups/{groupId}/errors/archive => ServiceControl.Recoverab POST /recoverability/groups/{groupId}/errors/retry => ServiceControl.Recoverability.API.FailureGroupsRetryController:ArchiveGroupErrors(String groupId) POST /recoverability/groups/{groupId}/errors/unarchive => ServiceControl.Recoverability.API.FailureGroupsUnarchiveController:UnarchiveGroupErrors(String groupId) GET /recoverability/groups/id/{groupId} => ServiceControl.Recoverability.API.FailureGroupsController:GetGroup(String groupId) -POST /recoverability/groups/reclassify => ServiceControl.Recoverability.API.FailureGroupsController:ReclassifyErrors() GET /recoverability/history => ServiceControl.Recoverability.API.FailureGroupsController:GetRetryHistory() DELETE /recoverability/unacknowledgedgroups/{groupId} => ServiceControl.Recoverability.API.UnacknowledgedGroupsController:AcknowledgeOperation(String groupId) HEAD /redirect => ServiceControl.MessageRedirects.Api.MessageRedirectsController:CountRedirects() diff --git a/src/ServiceControl/App.config b/src/ServiceControl/App.config index 7022771ace..9cd12c53e7 100644 --- a/src/ServiceControl/App.config +++ b/src/ServiceControl/App.config @@ -26,6 +26,7 @@ These settings are only here so that we can debug ServiceControl while developin + + diff --git a/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs index 09cb5b0700..ad14c5c97e 100644 --- a/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs +++ b/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs @@ -19,7 +19,8 @@ public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBe public void Dispose() { - //Injected + timer.Dispose(); + GC.SuppressFinalize(this); } public void Success() @@ -63,12 +64,12 @@ void CircuitBreakerTriggered(object state) long failureCount; Exception lastException; - string name; - Timer timer; - TimeSpan timeToWaitBeforeTriggering; - Action triggerAction; + readonly string name; + readonly Timer timer; + readonly TimeSpan timeToWaitBeforeTriggering; + readonly Action triggerAction; - static TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); - static ILog Logger = LogManager.GetLogger(); + static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl/MessageFailures/Api/UnArchiveMessagesController.cs b/src/ServiceControl/MessageFailures/Api/UnArchiveMessagesController.cs index e525e03de6..14ad4d57a0 100644 --- a/src/ServiceControl/MessageFailures/Api/UnArchiveMessagesController.cs +++ b/src/ServiceControl/MessageFailures/Api/UnArchiveMessagesController.cs @@ -53,8 +53,7 @@ public async Task Unarchive(string from, string to) await messageSession.SendLocal(new UnArchiveMessagesByRange { From = fromDateTime, - To = toDateTime, - CutOff = DateTime.UtcNow + To = toDateTime }); return StatusCode(HttpStatusCode.Accepted); diff --git a/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs b/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs index f0f5bc746d..b75b0bec29 100644 --- a/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs @@ -19,7 +19,7 @@ public async Task Handle(ArchiveMessage message, IMessageHandlerContext context) { var failedMessageId = message.FailedMessageId; - var failedMessage = await dataStore.FailedMessageFetch(failedMessageId); + var failedMessage = await dataStore.ErrorBy(failedMessageId); if (failedMessage.Status != FailedMessageStatus.Archived) { diff --git a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs index b581e53037..04543de5b9 100644 --- a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs @@ -17,16 +17,12 @@ public UnArchiveMessagesByRangeHandler(IErrorMessageDataStore dataStore, IDomain public async Task Handle(UnArchiveMessagesByRange message, IMessageHandlerContext context) { - var (ids, count) = await dataStore.UnArchiveMessagesByRange( - message.From, - message.To, - message.CutOff - ); + var ids = await dataStore.UnArchiveMessagesByRange(message.From, message.To); await domainEvents.Raise(new FailedMessagesUnArchived { DocumentIds = ids, - MessagesCount = count + MessagesCount = ids.Length }); } diff --git a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs index a37ed23b18..d3e8fa95f6 100644 --- a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs @@ -17,12 +17,12 @@ public UnArchiveMessagesHandler(IErrorMessageDataStore store, IDomainEvents doma public async Task Handle(UnArchiveMessages messages, IMessageHandlerContext context) { - var (ids, count) = await store.UnArchiveMessages(messages.FailedMessageIds); + var ids = await store.UnArchiveMessages(messages.FailedMessageIds); await domainEvents.Raise(new FailedMessagesUnArchived { DocumentIds = ids, - MessagesCount = count + MessagesCount = ids.Length }); } diff --git a/src/ServiceControl/MessageFailures/InternalMessages/ReclassificationOfErrorMessageComplete.cs b/src/ServiceControl/MessageFailures/InternalMessages/ReclassificationOfErrorMessageComplete.cs deleted file mode 100644 index 428c67c01c..0000000000 --- a/src/ServiceControl/MessageFailures/InternalMessages/ReclassificationOfErrorMessageComplete.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace ServiceControl.MessageFailures.InternalMessages -{ - using Infrastructure.DomainEvents; - - // TODO: Only used by Raven3.5 storage engine - public class ReclassificationOfErrorMessageComplete : IDomainEvent - { - public int NumberofMessageReclassified { get; set; } - } -} \ No newline at end of file diff --git a/src/ServiceControl/MessageFailures/InternalMessages/ReclassifyErrors.cs b/src/ServiceControl/MessageFailures/InternalMessages/ReclassifyErrors.cs deleted file mode 100644 index 27cf4d71e2..0000000000 --- a/src/ServiceControl/MessageFailures/InternalMessages/ReclassifyErrors.cs +++ /dev/null @@ -1,11 +0,0 @@ -namespace ServiceControl.MessageFailures.InternalMessages -{ - using System; - using NServiceBus; - - [Obsolete("Only used by legacy RavenDB35 storage engine")] // TODO: how to deal with this domain event - class ReclassifyErrors : ICommand - { - public bool Force { get; set; } - } -} \ No newline at end of file diff --git a/src/ServiceControl/MessageFailures/InternalMessages/UnArchiveMessagesByRange.cs b/src/ServiceControl/MessageFailures/InternalMessages/UnArchiveMessagesByRange.cs index 4a56f1c97f..ec108d5cc3 100644 --- a/src/ServiceControl/MessageFailures/InternalMessages/UnArchiveMessagesByRange.cs +++ b/src/ServiceControl/MessageFailures/InternalMessages/UnArchiveMessagesByRange.cs @@ -7,6 +7,5 @@ class UnArchiveMessagesByRange : ICommand { public DateTime To { get; set; } public DateTime From { get; set; } - public DateTime CutOff { get; set; } } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/API/FailureGroupsController.cs b/src/ServiceControl/Recoverability/API/FailureGroupsController.cs index 80377b5fec..d6a31f7925 100644 --- a/src/ServiceControl/Recoverability/API/FailureGroupsController.cs +++ b/src/ServiceControl/Recoverability/API/FailureGroupsController.cs @@ -1,6 +1,5 @@ namespace ServiceControl.Recoverability.API { - using System; using System.Collections.Generic; using System.Linq; using System.Net; @@ -8,8 +7,6 @@ using System.Threading.Tasks; using System.Web.Http; using Infrastructure.WebApi; - using MessageFailures.InternalMessages; - using NServiceBus; using Persistence.Infrastructure; using ServiceControl.Persistence; @@ -17,14 +14,12 @@ class FailureGroupsController : ApiController { public FailureGroupsController( IEnumerable classifiers, - IMessageSession bus, GroupFetcher groupFetcher, IErrorMessageDataStore dataStore, IRetryHistoryDataStore retryStore ) { this.classifiers = classifiers; - this.bus = bus; this.groupFetcher = groupFetcher; this.dataStore = dataStore; this.retryStore = retryStore; @@ -43,19 +38,6 @@ public HttpResponseMessage GetSupportedClassifiers() .WithTotalCount(result.Length); } - [Obsolete("Only used by legacy RavenDB35 storage engine")] // TODO: how to deal with this domain event - [Route("recoverability/groups/reclassify")] - [HttpPost] - public async Task ReclassifyErrors() - { - await bus.SendLocal(new ReclassifyErrors - { - Force = true - }); - - return Content(HttpStatusCode.Accepted, string.Empty); - } - [Route("recoverability/groups/{groupid}/comment")] [HttpPost] public async Task EditComment(string groupId, string comment) @@ -145,7 +127,6 @@ public async Task GetGroup(string groupId) } readonly IEnumerable classifiers; - readonly IMessageSession bus; readonly GroupFetcher groupFetcher; readonly IErrorMessageDataStore dataStore; readonly IRetryHistoryDataStore retryStore; diff --git a/src/ServiceControl/Recoverability/EventLog/ReclassificationOfErrorMessageCompleteDefinition.cs b/src/ServiceControl/Recoverability/EventLog/ReclassificationOfErrorMessageCompleteDefinition.cs deleted file mode 100644 index a2840a8400..0000000000 --- a/src/ServiceControl/Recoverability/EventLog/ReclassificationOfErrorMessageCompleteDefinition.cs +++ /dev/null @@ -1,13 +0,0 @@ -namespace ServiceControl.Recoverability.EventLog -{ - using MessageFailures.InternalMessages; - using ServiceControl.EventLog; - - class ReclassificationOfErrorMessageCompleteDefinition : EventLogMappingDefinition - { - public ReclassificationOfErrorMessageCompleteDefinition() - { - Description(m => $"Reclassification of {m.NumberofMessageReclassified} error messages without existing classification complete."); - } - } -} \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Grouping/Groupers/ReclassifyErrorsHandler.cs b/src/ServiceControl/Recoverability/Grouping/Groupers/ReclassifyErrorsHandler.cs deleted file mode 100644 index 6c618595f0..0000000000 --- a/src/ServiceControl/Recoverability/Grouping/Groupers/ReclassifyErrorsHandler.cs +++ /dev/null @@ -1,50 +0,0 @@ -namespace ServiceControl.Recoverability -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Infrastructure.DomainEvents; - using MessageFailures.InternalMessages; - using NServiceBus; - using ServiceControl.Persistence; - - [Obsolete("Only used by legacy RavenDB35 storage engine")] // TODO: how to deal with this domain event - class ReclassifyErrorsHandler : IHandleMessages - { - public ReclassifyErrorsHandler(IReclassifyFailedMessages reclassifier, IDomainEvents domainEvents) - { - this.reclassifier = reclassifier; - this.domainEvents = domainEvents; - } - - public async Task Handle(ReclassifyErrors message, IMessageHandlerContext context) - { - if (Interlocked.Exchange(ref executing, 1) != 0) - { - // Prevent more then one execution at a time - return; - } - - try - { - var failedMessagesReclassified = await reclassifier.ReclassifyFailedMessages(message.Force); - - if (failedMessagesReclassified > 0) - { - await domainEvents.Raise(new ReclassificationOfErrorMessageComplete - { - NumberofMessageReclassified = failedMessagesReclassified - }); - } - } - finally - { - Interlocked.Exchange(ref executing, 0); - } - } - - IDomainEvents domainEvents; - IReclassifyFailedMessages reclassifier; - static int executing; - } -} \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/RecoverabilityComponent.cs b/src/ServiceControl/Recoverability/RecoverabilityComponent.cs index 96f91c7667..7a4393bea9 100644 --- a/src/ServiceControl/Recoverability/RecoverabilityComponent.cs +++ b/src/ServiceControl/Recoverability/RecoverabilityComponent.cs @@ -103,7 +103,6 @@ public override void Configure(Settings settings, IHostBuilder hostBuilder) collection.AddEventLogMapping(); collection.AddEventLogMapping(); collection.AddEventLogMapping(); - collection.AddEventLogMapping(); }); } @@ -221,14 +220,13 @@ public AdoptOrphanBatchesFromPreviousSessionHostedService(RetryDocumentManager r { this.retryDocumentManager = retryDocumentManager; this.scheduler = scheduler; - startTime = DateTime.UtcNow; } internal async Task AdoptOrphanedBatchesAsync() { - var hasMoreWorkToDo = await retryDocumentManager.AdoptOrphanedBatches(startTime); + var moreWorkRemaining = await retryDocumentManager.AdoptOrphanedBatches(); - return hasMoreWorkToDo; + return moreWorkRemaining; } public Task StartAsync(CancellationToken cancellationToken) @@ -247,7 +245,6 @@ public Task StopAsync(CancellationToken cancellationToken) } TimerJob timer; - readonly DateTime startTime; readonly IAsyncTimer scheduler; readonly RetryDocumentManager retryDocumentManager; static readonly ILog log = LogManager.GetLogger(); diff --git a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSender.cs b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSender.cs index 97f195b191..a054ab4fbc 100644 --- a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSender.cs +++ b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSender.cs @@ -2,20 +2,17 @@ namespace ServiceControl.Recoverability { using System; using System.Collections.Generic; - using System.IO; using System.Threading.Tasks; using NServiceBus.Logging; using NServiceBus.Routing; using NServiceBus.Transport; - using Operations.BodyStorage; using ServiceControl.Persistence; class ReturnToSender { - public ReturnToSender(IBodyStorage bodyStorage, IErrorMessageDataStore errorMessageStore) + public ReturnToSender(IErrorMessageDataStore errorMessageStore) { this.errorMessageStore = errorMessageStore; - this.bodyStorage = bodyStorage; } public virtual async Task HandleMessage(MessageContext message, IDispatchMessages sender, string errorQueueTransportAddress) @@ -34,15 +31,7 @@ public virtual async Task HandleMessage(MessageContext message, IDispatchMessage if (outgoingHeaders.TryGetValue("ServiceControl.Retry.Attempt.MessageId", out var attemptMessageId)) { - if (outgoingHeaders.Remove("ServiceControl.Retry.BodyOnFailedMessage")) - { - body = await FetchFromFailedMessage(outgoingHeaders, messageId, attemptMessageId); - } - else - { - body = await FetchFromBodyStore(attemptMessageId, messageId); - } - + body = await FetchFromFailedMessage(outgoingHeaders, messageId, attemptMessageId); outgoingHeaders.Remove("ServiceControl.Retry.Attempt.MessageId"); } else @@ -86,60 +75,19 @@ async Task FetchFromFailedMessage(Dictionary outgoingHea var uniqueMessageId = outgoingHeaders["ServiceControl.Retry.UniqueMessageId"]; byte[] body = await errorMessageStore.FetchFromFailedMessage(uniqueMessageId); - // TODO: Weird that none of these logged parameters are actually used in the attempt to load the thing if (body == null) { - Log.WarnFormat("{0}: Message Body not found on index for attempt Id {1}", messageId, attemptMessageId); + Log.WarnFormat("{0}: Message Body not found in failed message with unique id {1} for attempt Id {1}", messageId, uniqueMessageId, attemptMessageId); } else if (Log.IsDebugEnabled) { - Log.DebugFormat("{0}: Body size: {1} bytes retrieved from index", messageId, body.LongLength); - } - - return body; - } - - async Task FetchFromBodyStore(string attemptMessageId, string messageId) - { - byte[] body = null; - var result = await bodyStorage.TryFetch(attemptMessageId); - - if (result == null) - { - throw new InvalidOperationException("IBodyStorage.TryFetch result cannot be null"); + Log.DebugFormat("{0}: Body size: {1} bytes retrieved from failed message attachment", messageId, body.LongLength); } - if (result.HasResult) - { - using (result.Stream) // Not strictly required for MemoryStream but might be different behavior in future .NET versions - { - // Unfortunately we can't use the buffer manager here yet because core doesn't allow to set the length property so usage of GetBuffer is not possible - // furthermore call ToArray would neglect many of the benefits of the recyclable stream - // RavenDB always returns a memory stream in ver. 3.5 so there is no need to pretend we need to do buffered reads since the memory is anyway fully allocated already - // this assumption might change when we stop supporting RavenDB 3.5 but right now this is the most memory efficient way to do things - // https://github.com/microsoft/Microsoft.IO.RecyclableMemoryStream#getbuffer-and-toarray - using (var memoryStream = new MemoryStream()) - { - await result.Stream.CopyToAsync(memoryStream); - - body = memoryStream.ToArray(); - } - } - - if (Log.IsDebugEnabled) - { - Log.DebugFormat("{0}: Body size: {1} bytes retrieved from attachment store", messageId, body.LongLength); - } - } - else - { - Log.WarnFormat("{0}: Message Body not found in attachment store for attempt Id {1}", messageId, attemptMessageId); - } return body; } static readonly byte[] EmptyBody = Array.Empty(); - readonly IBodyStorage bodyStorage; static readonly ILog Log = LogManager.GetLogger(typeof(ReturnToSender)); readonly IErrorMessageDataStore errorMessageStore; } diff --git a/src/ServiceControl/Recoverability/Retrying/RetriesGateway.cs b/src/ServiceControl/Recoverability/Retrying/RetriesGateway.cs index 245375c71f..c1ddb84baf 100644 --- a/src/ServiceControl/Recoverability/Retrying/RetriesGateway.cs +++ b/src/ServiceControl/Recoverability/Retrying/RetriesGateway.cs @@ -58,8 +58,7 @@ async Task StageRetryByUniqueMessageIds(string requestId, RetryType retryType, s Log.Info($"Created Batch '{batchDocumentId}' with {messageIds.Length} messages for '{batchName}'."); - await store.StageRetryByUniqueMessageIds(batchDocumentId, requestId, retryType, messageIds, startTime, last, - originator, batchName, classifier); + await store.StageRetryByUniqueMessageIds(batchDocumentId, messageIds); await MoveBatchToStaging(batchDocumentId); diff --git a/src/ServiceControl/Recoverability/Retrying/RetryDocumentManager.cs b/src/ServiceControl/Recoverability/Retrying/RetryDocumentManager.cs index ee4c8b6a3a..0393db8045 100644 --- a/src/ServiceControl/Recoverability/Retrying/RetryDocumentManager.cs +++ b/src/ServiceControl/Recoverability/Retrying/RetryDocumentManager.cs @@ -16,9 +16,9 @@ public RetryDocumentManager(IHostApplicationLifetime applicationLifetime, IRetry this.operationManager = operationManager; } - public async Task AdoptOrphanedBatches(DateTime cutoff) + public async Task AdoptOrphanedBatches() { - var orphanedBatches = await store.QueryOrphanedBatches(RetrySessionId, cutoff); + var orphanedBatches = await store.QueryOrphanedBatches(RetrySessionId); log.Info($"Found {orphanedBatches.Results.Count} orphaned retry batches from previous sessions."); @@ -71,7 +71,6 @@ public async Task RebuildRetryOperationState() readonly RetryingManager operationManager; readonly IRetryDocumentDataStore store; bool abort; - // TODO: Uplift this into DI? Meant to differentiate between ServiceControl.exe process runs, so likely doesn't matter. public static string RetrySessionId = Guid.NewGuid().ToString(); static readonly ILog log = LogManager.GetLogger(typeof(RetryDocumentManager)); diff --git a/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs b/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs index 78bbca18bf..65cd2a59f4 100644 --- a/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs +++ b/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs @@ -17,14 +17,13 @@ namespace ServiceControl.Recoverability class RetryProcessor { - public RetryProcessor(IRetryBatchesDataStore store, IDomainEvents domainEvents, ReturnToSenderDequeuer returnToSender, RetryingManager retryingManager, PersistenceSettings settings) + public RetryProcessor(IRetryBatchesDataStore store, IDomainEvents domainEvents, ReturnToSenderDequeuer returnToSender, RetryingManager retryingManager) { this.store = store; this.returnToSender = returnToSender; this.retryingManager = retryingManager; this.domainEvents = domainEvents; corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName); - messageBodiesAlwaysStoredInFailedMessage = settings.MessageBodiesAlwaysStoredInFailedMessage; } Task Enqueue(IDispatchMessages sender, TransportOperations outgoingMessages) @@ -334,10 +333,6 @@ TransportOperation ToTransportOperation(FailedMessage message, string stagingId) headersToRetryWith["ServiceControl.Retry.UniqueMessageId"] = message.UniqueMessageId; headersToRetryWith["ServiceControl.Retry.StagingId"] = stagingId; headersToRetryWith["ServiceControl.Retry.Attempt.MessageId"] = attempt.MessageId; - if (messageBodiesAlwaysStoredInFailedMessage || attempt.MessageMetadata.ContainsKey("Body") || attempt.Body != null) - { - headersToRetryWith["ServiceControl.Retry.BodyOnFailedMessage"] = null; - } corruptedReplyToHeaderStrategy.FixCorruptedReplyToHeader(headersToRetryWith); @@ -349,7 +344,6 @@ TransportOperation ToTransportOperation(FailedMessage message, string stagingId) readonly IRetryBatchesDataStore store; readonly ReturnToSenderDequeuer returnToSender; readonly RetryingManager retryingManager; - readonly bool messageBodiesAlwaysStoredInFailedMessage; MessageRedirectsCollection redirects; bool isRecoveringFromPrematureShutdown = true; diff --git a/src/ServiceControl/SagaAudit/SagaAuditComponent.cs b/src/ServiceControl/SagaAudit/SagaAuditComponent.cs deleted file mode 100644 index 1380b5470f..0000000000 --- a/src/ServiceControl/SagaAudit/SagaAuditComponent.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace ServiceControl.SagaAudit -{ - using Microsoft.Extensions.Hosting; - using Particular.ServiceControl; - using ServiceBus.Management.Infrastructure.Settings; - - class SagaAuditComponent : ServiceControlComponent - { - public override void Configure(Settings settings, IHostBuilder hostBuilder) - { - // TODO: If this component doesn't do anything, should it even exist? - // THEORY: Remove in V5, since then there will be no audit capabilities left in the primary instance - } - - public override void Setup(Settings settings, IComponentInstallationContext context) - { - } - } -} \ No newline at end of file diff --git a/src/ServiceControl/ServiceControlMainInstance.cs b/src/ServiceControl/ServiceControlMainInstance.cs index f3649231f9..fd0429eae9 100644 --- a/src/ServiceControl/ServiceControlMainInstance.cs +++ b/src/ServiceControl/ServiceControlMainInstance.cs @@ -5,7 +5,6 @@ namespace Particular.ServiceControl using global::ServiceControl.ExternalIntegrations; using global::ServiceControl.Monitoring; using global::ServiceControl.Recoverability; - using global::ServiceControl.SagaAudit; static class ServiceControlMainInstance { @@ -14,7 +13,6 @@ static class ServiceControlMainInstance new EventLogComponent(), new ExternalIntegrationsComponent(), new RecoverabilityComponent(), - new SagaAuditComponent(), new HeartbeatMonitoringComponent(), new CustomChecksComponent() }; diff --git a/src/ServiceControlInstaller.Engine.UnitTests/Configuration/AuditInstanceTests.cs b/src/ServiceControlInstaller.Engine.UnitTests/Configuration/AuditInstanceTests.cs index 139339d31e..354eb9a77e 100644 --- a/src/ServiceControlInstaller.Engine.UnitTests/Configuration/AuditInstanceTests.cs +++ b/src/ServiceControlInstaller.Engine.UnitTests/Configuration/AuditInstanceTests.cs @@ -13,8 +13,7 @@ [TestFixture] class AuditInstanceTests : InstallationFixture { - // TODO: Revisit this test once installer work is done to make Raven35 instances non-upgradeable, and remove Ignore attribute - [Test, Ignore("Revisit when installer makes Raven35 instances non-upgradeable")] + [Test] public void Should_default_to_raven35_when_no_config_entry_exists() { var newInstance = ServiceControlAuditNewInstance.CreateWithPersistence(ZipFileFolder.FullName, "RavenDB35"); @@ -41,13 +40,13 @@ public void Should_default_to_raven35_when_no_config_entry_exists() instance.Reload(); - var persisterFilePath = Path.Combine(InstallPath, "ServiceControl.Audit.Persistence.RavenDb.dll"); - - //delete the persitence dll to make sure it gets re-installed - File.Delete(persisterFilePath); - instance.UpgradeFiles(ZipFilePath); - FileAssert.Exists(persisterFilePath); + FileAssert.DoesNotExist(Path.Combine(InstallPath, "ServiceControl.Audit.Persistence.RavenDb.dll")); + FileAssert.DoesNotExist(Path.Combine(InstallPath, "ServiceControl.Audit.Persistence.RavenDb5.dll")); + + var manifestFilePath = Path.Combine(InstallPath, "persistence.manifest"); + var manifestText = File.ReadAllText(manifestFilePath); + StringAssert.Contains("RavenDB 3.5", manifestText); } [Test]