From fdfa82dbd07c4cfa2dceef605919deb42cd494ec Mon Sep 17 00:00:00 2001 From: David Boike Date: Fri, 29 Sep 2023 09:53:56 -0500 Subject: [PATCH 1/6] Try storing message body attachments on FailedMessage documents --- .../MessageBodyIdGenerator.cs | 9 --------- .../RavenAttachmentsBodyStorage.cs | 18 +++++++++--------- ...RavenDbRecoverabilityIngestionUnitOfWork.cs | 18 +++--------------- 3 files changed, 12 insertions(+), 33 deletions(-) delete mode 100644 src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs diff --git a/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs b/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs deleted file mode 100644 index 1fc5da71b1..0000000000 --- a/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs +++ /dev/null @@ -1,9 +0,0 @@ -static class MessageBodyIdGenerator -{ - const string CollectionName = "MessageBodies"; - - public static string MakeDocumentId(string messageUniqueId) - { - return $"{CollectionName}/{messageUniqueId}"; - } -} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs index 94bb31a7cd..a1d0b08868 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs @@ -4,7 +4,6 @@ using System.Threading.Tasks; using Raven.Client.Documents; using Raven.Client.Documents.Commands.Batches; - using Sparrow.Json.Parsing; class RavenAttachmentsBodyStorage : IBodyStorage { @@ -18,24 +17,25 @@ public RavenAttachmentsBodyStorage(IDocumentStore documentStore) // TODO: This method is only used in tests and not by ServiceControl itself! But in the Raven3.5 persister, it IS used! // It should probably be removed and tests should use the RavenDbRecoverabilityIngestionUnitOfWork - public async Task Store(string messageId, string contentType, int bodySize, Stream bodyStream) + public async Task Store(string uniqueId, string contentType, int bodySize, Stream bodyStream) { - var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId); - - var emptyDoc = new DynamicJsonValue(); - var putOwnerDocumentCmd = new PutCommandData(documentId, null, emptyDoc); + // In RavenDB 5 persistence, the ID must be the UniqueID representing MessageID+Endpoint so that we can + // load the body from the FailedMessage/{UniqueId} document. + var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); var stream = bodyStream; var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null); using var session = documentStore.OpenAsyncSession(); - session.Advanced.Defer(new ICommandData[] { putOwnerDocumentCmd, putAttachmentCmd }); + session.Advanced.Defer(putAttachmentCmd); await session.SaveChangesAsync(); } - public async Task TryFetch(string messageId) + public async Task TryFetch(string uniqueId) { - var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId); + // In RavenDB 5 persistence, the ID must be the UniqueID representing MessageID+Endpoint so that we can + // load the body from the FailedMessage/{UniqueId} document. + var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); using var session = documentStore.OpenAsyncSession(); diff --git a/src/ServiceControl.Persistence.RavenDb5/UnitOfWork/RavenDbRecoverabilityIngestionUnitOfWork.cs b/src/ServiceControl.Persistence.RavenDb5/UnitOfWork/RavenDbRecoverabilityIngestionUnitOfWork.cs index 63152ec2fd..039021d81b 100644 --- a/src/ServiceControl.Persistence.RavenDb5/UnitOfWork/RavenDbRecoverabilityIngestionUnitOfWork.cs +++ b/src/ServiceControl.Persistence.RavenDb5/UnitOfWork/RavenDbRecoverabilityIngestionUnitOfWork.cs @@ -13,7 +13,6 @@ using ServiceControl.Persistence.Infrastructure; using ServiceControl.Persistence.UnitOfWork; using ServiceControl.Recoverability; - using Sparrow.Json.Parsing; class RavenDbRecoverabilityIngestionUnitOfWork : IRecoverabilityIngestionUnitOfWork { @@ -32,13 +31,12 @@ public Task RecordFailedProcessingAttempt( List groups) { var uniqueMessageId = context.Headers.UniqueId(); - var bodyId = processingAttempt.Headers.MessageId(); var contentType = GetContentType(context.Headers, "text/xml"); var bodySize = context.Body?.Length ?? 0; processingAttempt.MessageMetadata.Add("ContentType", contentType); processingAttempt.MessageMetadata.Add("ContentLength", bodySize); - processingAttempt.MessageMetadata.Add("BodyUrl", $"/messages/{bodyId}/body"); + processingAttempt.MessageMetadata.Add("BodyUrl", $"/messages/{uniqueMessageId}/body"); if (doFullTextIndexing) { @@ -143,22 +141,12 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess void AddStoreBodyCommands(MessageContext context, string contentType) { - var messageId = context.Headers.MessageId(); - var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId); - - var emptyDoc = new DynamicJsonValue - { - ["@metadata"] = new DynamicJsonValue - { - ["@collection"] = "MessageBodies" - } - }; - var putOwnerDocumentCmd = new PutCommandData(documentId, null, emptyDoc); + var uniqueId = context.Headers.UniqueId(); + var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); var stream = Memory.Manager.GetStream(context.Body); var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null); - parentUnitOfWork.AddCommand(putOwnerDocumentCmd); parentUnitOfWork.AddCommand(putAttachmentCmd); } From e7d09129259b34a947af2260e205b4674fcad319 Mon Sep 17 00:00:00 2001 From: David Boike Date: Fri, 29 Sep 2023 10:37:59 -0500 Subject: [PATCH 2/6] Rewrite body storage test to use normal ingestion --- .../RavenAttachmentsBodyStorage.cs | 19 ++------- .../RavenAttachmentsBodyStorageTests.cs | 40 +++++++++++++++++-- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs index a1d0b08868..1c439892e5 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs @@ -1,9 +1,9 @@ namespace ServiceControl.Operations.BodyStorage.RavenAttachments { + using System; using System.IO; using System.Threading.Tasks; using Raven.Client.Documents; - using Raven.Client.Documents.Commands.Batches; class RavenAttachmentsBodyStorage : IBodyStorage { @@ -15,21 +15,8 @@ public RavenAttachmentsBodyStorage(IDocumentStore documentStore) this.documentStore = documentStore; } - // TODO: This method is only used in tests and not by ServiceControl itself! But in the Raven3.5 persister, it IS used! - // It should probably be removed and tests should use the RavenDbRecoverabilityIngestionUnitOfWork - public async Task Store(string uniqueId, string contentType, int bodySize, Stream bodyStream) - { - // In RavenDB 5 persistence, the ID must be the UniqueID representing MessageID+Endpoint so that we can - // load the body from the FailedMessage/{UniqueId} document. - var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); - - var stream = bodyStream; - var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null); - - using var session = documentStore.OpenAsyncSession(); - session.Advanced.Defer(putAttachmentCmd); - await session.SaveChangesAsync(); - } + public Task Store(string uniqueId, string contentType, int bodySize, Stream bodyStream) + => throw new NotImplementedException("Only included for interface compatibility with Raven3.5 persister implementation. Raven5 tests should use IIngestionUnitOfWorkFactory to store failed messages/bodies."); public async Task TryFetch(string uniqueId) { diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs b/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs index a087ef29ef..72bcf2be79 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs @@ -1,9 +1,15 @@ namespace ServiceControl.UnitTests.BodyStorage { using System; - using System.IO; + using System.Collections.Generic; + using System.Runtime.Remoting.Contexts; + using System.Threading; using System.Threading.Tasks; + using NServiceBus; + using NServiceBus.Transport; using NUnit.Framework; + using ServiceControl.MessageFailures; + using ServiceControl.Persistence.UnitOfWork; [TestFixture] sealed class RavenAttachmentsBodyStorageTests : PersistenceTestBase @@ -13,11 +19,39 @@ public async Task Attachments_with_ids_that_contain_backslash_should_be_readable { var messageId = "3f0240a7-9b2e-4e2a-ab39-6114932adad1\\2055783"; var contentType = "NotImportant"; + var endpointName = "EndpointName"; var body = BitConverter.GetBytes(0xDEADBEEF); + var ingestionFactory = GetRequiredService(); - await BodyStorage.Store(messageId, contentType, body.Length, new MemoryStream(body)); + var headers = new Dictionary + { + [Headers.MessageId] = messageId, + [Headers.ProcessingEndpoint] = endpointName, + [Headers.ContentType] = contentType + }; - var retrieved = await BodyStorage.TryFetch(messageId); + using (var cancellationSource = new CancellationTokenSource()) + using (var uow = await ingestionFactory.StartNew()) + { + var context = new MessageContext(messageId, headers, body, new TransportTransaction(), cancellationSource, new NServiceBus.Extensibility.ContextBag()); + var processingAttempt = new FailedMessage.ProcessingAttempt + { + MessageId = messageId, + FailureDetails = new Contracts.Operations.FailureDetails + { + AddressOfFailingEndpoint = endpointName + }, + Headers = headers + }; + var groups = new List(); + + await uow.Recoverability.RecordFailedProcessingAttempt(context, processingAttempt, groups); + await uow.Complete(); + } + + var uniqueMessageId = headers.UniqueId(); + + var retrieved = await BodyStorage.TryFetch(uniqueMessageId); Assert.IsNotNull(retrieved); Assert.True(retrieved.HasResult); Assert.AreEqual(contentType, retrieved.ContentType); From e69dcdf2472a143bad9731f14d633e4a75cb32f4 Mon Sep 17 00:00:00 2001 From: David Boike Date: Fri, 29 Sep 2023 12:08:39 -0500 Subject: [PATCH 3/6] Make RetryProcessor know that Raven5 will always store the body the same way --- .../RavenDBPersisterSettings.cs | 2 ++ .../RavenDbPersistence.cs | 1 + .../ErrorMessagesDataStore.cs | 36 ++++++++++++++++--- .../RavenAttachmentsBodyStorage.cs | 2 +- .../RavenDBPersisterSettings.cs | 2 ++ .../RavenDbPersistence.cs | 1 + .../TestPersistenceImpl.cs | 5 ++- .../EnsureSettingsInContainer.cs | 22 ++++++++++++ .../PersistenceTestBase.cs | 3 ++ .../RetryStateTests.cs | 10 +++--- .../TestPersistence.cs | 3 ++ .../PersistenceSettings.cs | 1 + .../Recoverability/Retrying/RetryProcessor.cs | 15 ++++---- 13 files changed, 84 insertions(+), 19 deletions(-) create mode 100644 src/ServiceControl.Persistence.Tests/EnsureSettingsInContainer.cs diff --git a/src/ServiceControl.Persistence.RavenDb/RavenDBPersisterSettings.cs b/src/ServiceControl.Persistence.RavenDb/RavenDBPersisterSettings.cs index a15e1e4e3f..5618a1202a 100644 --- a/src/ServiceControl.Persistence.RavenDb/RavenDBPersisterSettings.cs +++ b/src/ServiceControl.Persistence.RavenDb/RavenDBPersisterSettings.cs @@ -20,5 +20,7 @@ class RavenDBPersisterSettings : PersistenceSettings public TimeSpan? AuditRetentionPeriod { get; set; } public int ExternalIntegrationsDispatchingBatchSize { get; set; } = 100; + public override bool MessageBodiesAlwaysStoredInFailedMessage => false; + public const int DatabaseMaintenancePortDefault = 33334; } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs b/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs index fe5e51bf71..20a04f894d 100644 --- a/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs +++ b/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs @@ -33,6 +33,7 @@ public void Configure(IServiceCollection serviceCollection) return; } + serviceCollection.AddSingleton(settings); serviceCollection.AddSingleton(settings); serviceCollection.AddSingleton(documentStore); diff --git a/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs index ecc19fb28f..0a01e5a31c 100644 --- a/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs @@ -2,6 +2,7 @@ { using System; using System.Collections.Generic; + using System.IO; using System.Linq; using System.Threading.Tasks; using Editing; @@ -19,17 +20,20 @@ using ServiceControl.MessageFailures; using ServiceControl.MessageFailures.Api; using ServiceControl.Operations; + using ServiceControl.Operations.BodyStorage; using ServiceControl.Persistence.Infrastructure; using ServiceControl.Recoverability; class ErrorMessagesDataStore : IErrorMessageDataStore { readonly IDocumentStore documentStore; + readonly IBodyStorage bodyStorage; readonly TimeSpan eventsRetentionPeriod; - public ErrorMessagesDataStore(IDocumentStore documentStore, RavenDBPersisterSettings settings) + public ErrorMessagesDataStore(IDocumentStore documentStore, IBodyStorage bodyStorage, RavenDBPersisterSettings settings) { this.documentStore = documentStore; + this.bodyStorage = bodyStorage; eventsRetentionPeriod = settings.EventsRetentionPeriod; } @@ -730,13 +734,37 @@ public async Task GetRetryPendingMessages(DateTime from, DateTime to, } } } - return ids.ToArray(); } - public Task FetchFromFailedMessage(string uniqueMessageId) + public async Task FetchFromFailedMessage(string uniqueMessageId) { - throw new NotSupportedException("Body not stored embedded"); + byte[] body = null; + var result = await bodyStorage.TryFetch(uniqueMessageId); + + if (result == null) + { + throw new InvalidOperationException("IBodyStorage.TryFetch result cannot be null"); + } + + if (result.HasResult) + { + using (result.Stream) // Not strictly required for MemoryStream but might be different behavior in future .NET versions + { + // Unfortunately we can't use the buffer manager here yet because core doesn't allow to set the length property so usage of GetBuffer is not possible + // furthermore call ToArray would neglect many of the benefits of the recyclable stream + // RavenDB always returns a memory stream in ver. 3.5 so there is no need to pretend we need to do buffered reads since the memory is anyway fully allocated already + // this assumption might change when we stop supporting RavenDB 3.5 but right now this is the most memory efficient way to do things + // https://github.com/microsoft/Microsoft.IO.RecyclableMemoryStream#getbuffer-and-toarray + using (var memoryStream = new MemoryStream()) + { + await result.Stream.CopyToAsync(memoryStream); + + body = memoryStream.ToArray(); + } + } + } + return body; } public async Task StoreEventLogItem(EventLogItem logItem) diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs index 1c439892e5..50fd035b10 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs @@ -7,7 +7,7 @@ class RavenAttachmentsBodyStorage : IBodyStorage { - const string AttachmentName = "body"; + public const string AttachmentName = "body"; readonly IDocumentStore documentStore; public RavenAttachmentsBodyStorage(IDocumentStore documentStore) diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs index 5f4e33a76c..b01120e995 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs @@ -27,4 +27,6 @@ class RavenDBPersisterSettings : PersistenceSettings public const int DatabaseMaintenancePortDefault = 33334; public const int ExpirationProcessTimerInSecondsDefault = 600; public const string LogsModeDefault = "Information"; + + public override bool MessageBodiesAlwaysStoredInFailedMessage => true; } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs index abf7bffcc9..0e2d4e201f 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs @@ -30,6 +30,7 @@ public void Configure(IServiceCollection serviceCollection) return; } + serviceCollection.AddSingleton(settings); serviceCollection.AddSingleton(settings); serviceCollection.AddSingleton(); diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/TestPersistenceImpl.cs b/src/ServiceControl.Persistence.Tests.RavenDb5/TestPersistenceImpl.cs index 86d9fce004..d7c6113d4f 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/TestPersistenceImpl.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/TestPersistenceImpl.cs @@ -17,7 +17,6 @@ sealed class TestPersistenceImpl : TestPersistence { readonly string databaseName; - readonly RavenDBPersisterSettings settings; IDocumentStore documentStore; public TestPersistenceImpl() @@ -27,7 +26,7 @@ public TestPersistenceImpl() TestContext.Out.WriteLine($"Test Database Name: {databaseName}"); - settings = new RavenDBPersisterSettings + Settings = new RavenDBPersisterSettings { AuditRetentionPeriod = retentionPeriod, ErrorRetentionPeriod = retentionPeriod, @@ -39,7 +38,7 @@ public TestPersistenceImpl() public override void Configure(IServiceCollection services) { - var persistence = new RavenDbPersistenceConfiguration().Create(settings); + var persistence = new RavenDbPersistenceConfiguration().Create(Settings); PersistenceHostBuilderExtensions.CreatePersisterLifecyle(services, persistence); services.AddHostedService(p => new FakeServiceToExtractDocumentStore(this, p.GetRequiredService())); } diff --git a/src/ServiceControl.Persistence.Tests/EnsureSettingsInContainer.cs b/src/ServiceControl.Persistence.Tests/EnsureSettingsInContainer.cs new file mode 100644 index 0000000000..9773b566ea --- /dev/null +++ b/src/ServiceControl.Persistence.Tests/EnsureSettingsInContainer.cs @@ -0,0 +1,22 @@ +namespace ServiceControl.PersistenceTests +{ + using NUnit.Framework; + using ServiceControl.Persistence; + + public sealed class EnsureSettingsInContainer : PersistenceTestBase + { + [Test] + public void CheckForBothTypes() + { + // Persistence implementation must register singleton as base type as some compoennts need to inject that + var baseSettings = GetRequiredService(); + + var actualType = baseSettings.GetType(); + Assert.AreNotEqual(actualType, typeof(PersistenceSettings)); + + // Persistence implementation must also register the same singleton as the persister-specific type + var settingsAsActualType = GetRequiredService(actualType); + Assert.AreSame(baseSettings, settingsAsActualType); + } + } +} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs b/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs index cf247993f0..89fc75d67d 100644 --- a/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs +++ b/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs @@ -56,7 +56,10 @@ public async Task TearDown() host.Dispose(); } + protected PersistenceSettings PersistenceSettings => testPersistence.Settings; + protected T GetRequiredService() => host.Services.GetRequiredService(); + protected object GetRequiredService(Type serviceType) => host.Services.GetRequiredService(serviceType); protected Action RegisterServices { get; set; } diff --git a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs index 593bb12f37..08e49a6cc5 100644 --- a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs +++ b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs @@ -59,7 +59,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte await CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, "Test-group", true, 2001); var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); // Needs index RetryBatches_ByStatus_ReduceInitialBatchSize CompleteDatabaseOperation(); @@ -73,7 +73,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte await documentManager.RebuildRetryOperationState(); - processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager); + processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); await processor.ProcessBatches(sender); @@ -92,7 +92,7 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed() var sender = new TestSender(); var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, PersistenceSettings); await processor.ProcessBatches(sender); // mark ready await processor.ProcessBatches(sender); @@ -122,7 +122,7 @@ public async Task When_there_is_one_poison_message_it_is_removed_from_batch_and_ }; var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, PersistenceSettings); bool c; do @@ -159,7 +159,7 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_ var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); CompleteDatabaseOperation(); diff --git a/src/ServiceControl.Persistence.Tests/TestPersistence.cs b/src/ServiceControl.Persistence.Tests/TestPersistence.cs index 85208e3799..71f18f291c 100644 --- a/src/ServiceControl.Persistence.Tests/TestPersistence.cs +++ b/src/ServiceControl.Persistence.Tests/TestPersistence.cs @@ -3,9 +3,12 @@ using System.Diagnostics; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; + using ServiceControl.Persistence; abstract class TestPersistence { + public PersistenceSettings Settings { get; protected set; } + public abstract void Configure(IServiceCollection services); public abstract void CompleteDatabaseOperation(); diff --git a/src/ServiceControl.Persistence/PersistenceSettings.cs b/src/ServiceControl.Persistence/PersistenceSettings.cs index 352046ec62..ab621c7a5c 100644 --- a/src/ServiceControl.Persistence/PersistenceSettings.cs +++ b/src/ServiceControl.Persistence/PersistenceSettings.cs @@ -15,5 +15,6 @@ public abstract class PersistenceSettings public TimeSpan? OverrideCustomCheckRepeatTime { get; set; } + public abstract bool MessageBodiesAlwaysStoredInFailedMessage { get; } } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs b/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs index 84a7fbe6df..b15f3bee08 100644 --- a/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs +++ b/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs @@ -17,13 +17,14 @@ namespace ServiceControl.Recoverability class RetryProcessor { - public RetryProcessor(IRetryBatchesDataStore store, IDomainEvents domainEvents, ReturnToSenderDequeuer returnToSender, RetryingManager retryingManager) + public RetryProcessor(IRetryBatchesDataStore store, IDomainEvents domainEvents, ReturnToSenderDequeuer returnToSender, RetryingManager retryingManager, PersistenceSettings settings) { this.store = store; this.returnToSender = returnToSender; this.retryingManager = retryingManager; this.domainEvents = domainEvents; corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName); + messageBodiesAlwaysStoredInFailedMessage = settings.MessageBodiesAlwaysStoredInFailedMessage; } Task Enqueue(IDispatchMessages sender, TransportOperations outgoingMessages) @@ -332,7 +333,7 @@ TransportOperation ToTransportOperation(FailedMessage message, string stagingId) headersToRetryWith["ServiceControl.Retry.UniqueMessageId"] = message.UniqueMessageId; headersToRetryWith["ServiceControl.Retry.StagingId"] = stagingId; headersToRetryWith["ServiceControl.Retry.Attempt.MessageId"] = attempt.MessageId; - if (attempt.MessageMetadata.ContainsKey("Body") || attempt.Body != null) + if (messageBodiesAlwaysStoredInFailedMessage || attempt.MessageMetadata.ContainsKey("Body") || attempt.Body != null) { headersToRetryWith["ServiceControl.Retry.BodyOnFailedMessage"] = null; } @@ -343,10 +344,12 @@ TransportOperation ToTransportOperation(FailedMessage message, string stagingId) return new TransportOperation(transportMessage, new UnicastAddressTag(returnToSender.InputAddress)); } - IDomainEvents domainEvents; - IRetryBatchesDataStore store; - ReturnToSenderDequeuer returnToSender; - RetryingManager retryingManager; + readonly IDomainEvents domainEvents; + readonly IRetryBatchesDataStore store; + readonly ReturnToSenderDequeuer returnToSender; + readonly RetryingManager retryingManager; + readonly bool messageBodiesAlwaysStoredInFailedMessage; + MessageRedirectsCollection redirects; bool isRecoveringFromPrematureShutdown = true; CorruptedReplyToHeaderStrategy corruptedReplyToHeaderStrategy; From 03f500a4b6b17fc524563b689ffe50aa9a31df8d Mon Sep 17 00:00:00 2001 From: David Boike Date: Fri, 29 Sep 2023 15:21:08 -0500 Subject: [PATCH 4/6] Allow IBodyStorage to query first by unique id, and if that fails, do an index lookup by MessageId and return that --- .../RavenAttachmentsBodyStorage.cs | 41 ++++++++++++++++--- .../TestPersistenceImpl.cs | 9 ++-- .../RavenAttachmentsBodyStorageTests.cs | 33 +++++++++++++-- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs index 50fd035b10..8acde22ac4 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs @@ -2,8 +2,12 @@ { using System; using System.IO; + using System.Linq; using System.Threading.Tasks; using Raven.Client.Documents; + using Raven.Client.Documents.Session; + using ServiceControl.MessageFailures; + using ServiceControl.MessageFailures.Api; class RavenAttachmentsBodyStorage : IBodyStorage { @@ -18,14 +22,41 @@ public RavenAttachmentsBodyStorage(IDocumentStore documentStore) public Task Store(string uniqueId, string contentType, int bodySize, Stream bodyStream) => throw new NotImplementedException("Only included for interface compatibility with Raven3.5 persister implementation. Raven5 tests should use IIngestionUnitOfWorkFactory to store failed messages/bodies."); - public async Task TryFetch(string uniqueId) + public async Task TryFetch(string bodyId) { - // In RavenDB 5 persistence, the ID must be the UniqueID representing MessageID+Endpoint so that we can - // load the body from the FailedMessage/{UniqueId} document. - var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); - using var session = documentStore.OpenAsyncSession(); + // BodyId could be a MessageID or a UniqueID, but if a UniqueID then it will be a DeterministicGuid of MessageID and endpoint name and be Guid-parseable + // This is preferred, then we know we're getting the correct message body that is attached to the FailedMessage document + if (Guid.TryParse(bodyId, out _)) + { + var result = await ResultForUniqueId(session, bodyId); + if (result != null) + { + return result; + } + } + + // See if we can look up a FailedMessage by MessageID + var query = session.Query() + .Where(msg => msg.MessageId == bodyId, true) + .OfType() + .Select(msg => msg.UniqueMessageId); + + var uniqueId = await query.FirstOrDefaultAsync(); + + if (uniqueId != null) + { + return await ResultForUniqueId(session, uniqueId); + } + + return null; + } + + async Task ResultForUniqueId(IAsyncDocumentSession session, string uniqueId) + { + var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); + var result = await session.Advanced.Attachments.GetAsync(documentId, AttachmentName); if (result == null) diff --git a/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs b/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs index ad71aa9658..23e2ca0be6 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs @@ -14,10 +14,9 @@ sealed class TestPersistenceImpl : TestPersistence { - readonly RavenDBPersisterSettings settings = CreateSettings(); IDocumentStore documentStore; - static RavenDBPersisterSettings CreateSettings() + public TestPersistenceImpl() { var retentionPeriod = TimeSpan.FromMinutes(1); @@ -36,12 +35,12 @@ static RavenDBPersisterSettings CreateSettings() settings.ExposeRavenDB = true; } - return settings; + Settings = settings; } public override void Configure(IServiceCollection services) { - var persistence = new RavenDbPersistenceConfiguration().Create(CreateSettings()); + var persistence = new RavenDbPersistenceConfiguration().Create(Settings); PersistenceHostBuilderExtensions.CreatePersisterLifecyle(services, persistence); services.AddHostedService(p => new Wrapper(this, p.GetRequiredService())); } @@ -70,7 +69,7 @@ public override void BlockToInspectDatabase() return; } - var url = $"http://localhost:{settings.DatabaseMaintenancePort}/studio/index.html#databases/documents?&database=%3Csystem%3E"; + var url = $"http://localhost:{(Settings as RavenDBPersisterSettings).DatabaseMaintenancePort}/studio/index.html#databases/documents?&database=%3Csystem%3E"; if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs b/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs index 72bcf2be79..0cc85d9a12 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs @@ -2,21 +2,33 @@ { using System; using System.Collections.Generic; - using System.Runtime.Remoting.Contexts; using System.Threading; using System.Threading.Tasks; using NServiceBus; using NServiceBus.Transport; using NUnit.Framework; using ServiceControl.MessageFailures; + using ServiceControl.Operations; using ServiceControl.Persistence.UnitOfWork; [TestFixture] sealed class RavenAttachmentsBodyStorageTests : PersistenceTestBase { [Test] - public async Task Attachments_with_ids_that_contain_backslash_should_be_readable() + public async Task QueryByUniqueId() { + await RunTest(headers => headers.UniqueId()); + } + + [Test] + public async Task QueryByMessageId() + { + await RunTest(headers => headers.MessageId()); + } + + async Task RunTest(Func, string> getIdToQuery) + { + // Contains a backslash, like an old MSMQ message id, to ensure that message ids like this are usable var messageId = "3f0240a7-9b2e-4e2a-ab39-6114932adad1\\2055783"; var contentType = "NotImportant"; var endpointName = "EndpointName"; @@ -37,6 +49,17 @@ public async Task Attachments_with_ids_that_contain_backslash_should_be_readable var processingAttempt = new FailedMessage.ProcessingAttempt { MessageId = messageId, + MessageMetadata = new Dictionary + { + ["MessageId"] = messageId, + ["TimeSent"] = DateTime.UtcNow, + ["ReceivingEndpoint"] = new EndpointDetails + { + Name = endpointName, + Host = "Host", + HostId = Guid.NewGuid() + } + }, FailureDetails = new Contracts.Operations.FailureDetails { AddressOfFailingEndpoint = endpointName @@ -49,9 +72,11 @@ public async Task Attachments_with_ids_that_contain_backslash_should_be_readable await uow.Complete(); } - var uniqueMessageId = headers.UniqueId(); + CompleteDatabaseOperation(); + + var fetchById = getIdToQuery(headers); - var retrieved = await BodyStorage.TryFetch(uniqueMessageId); + var retrieved = await BodyStorage.TryFetch(fetchById); Assert.IsNotNull(retrieved); Assert.True(retrieved.HasResult); Assert.AreEqual(contentType, retrieved.ContentType); From 0765da5f919e775d72c4c95d546eeb3406088ee4 Mon Sep 17 00:00:00 2001 From: David Boike Date: Wed, 4 Oct 2023 07:56:45 -0500 Subject: [PATCH 5/6] Apply suggestions from code review Co-authored-by: Ramon Smits --- .../TestPersistenceImpl.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs b/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs index 23e2ca0be6..97541d563f 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs @@ -69,7 +69,8 @@ public override void BlockToInspectDatabase() return; } - var url = $"http://localhost:{(Settings as RavenDBPersisterSettings).DatabaseMaintenancePort}/studio/index.html#databases/documents?&database=%3Csystem%3E"; + var databaseMaintenanceUrl = ((RavenDBPersisterSettings)Settings).DatabaseMaintenanceUrl + var url = databaseMaintenanceUrl + $"/studio/index.html#databases/documents?&database=%3Csystem%3E"; if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { From 99a8e8b27c05fd20179891e31ea22b0527439095 Mon Sep 17 00:00:00 2001 From: David Boike Date: Wed, 4 Oct 2023 08:11:25 -0500 Subject: [PATCH 6/6] semicolon --- .../TestPersistenceImpl.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs b/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs index 97541d563f..18ef6b0983 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs @@ -69,7 +69,7 @@ public override void BlockToInspectDatabase() return; } - var databaseMaintenanceUrl = ((RavenDBPersisterSettings)Settings).DatabaseMaintenanceUrl + var databaseMaintenanceUrl = ((RavenDBPersisterSettings)Settings).DatabaseMaintenanceUrl; var url = databaseMaintenanceUrl + $"/studio/index.html#databases/documents?&database=%3Csystem%3E"; if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) @@ -99,4 +99,4 @@ public override Task TearDown() return Task.CompletedTask; } } -} \ No newline at end of file +}