diff --git a/src/ServiceControl.Persistence.RavenDb/RavenDBPersisterSettings.cs b/src/ServiceControl.Persistence.RavenDb/RavenDBPersisterSettings.cs index a15e1e4e3f..5618a1202a 100644 --- a/src/ServiceControl.Persistence.RavenDb/RavenDBPersisterSettings.cs +++ b/src/ServiceControl.Persistence.RavenDb/RavenDBPersisterSettings.cs @@ -20,5 +20,7 @@ class RavenDBPersisterSettings : PersistenceSettings public TimeSpan? AuditRetentionPeriod { get; set; } public int ExternalIntegrationsDispatchingBatchSize { get; set; } = 100; + public override bool MessageBodiesAlwaysStoredInFailedMessage => false; + public const int DatabaseMaintenancePortDefault = 33334; } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs b/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs index fe5e51bf71..20a04f894d 100644 --- a/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs +++ b/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs @@ -33,6 +33,7 @@ public void Configure(IServiceCollection serviceCollection) return; } + serviceCollection.AddSingleton(settings); serviceCollection.AddSingleton(settings); serviceCollection.AddSingleton(documentStore); diff --git a/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs b/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs deleted file mode 100644 index 1fc5da71b1..0000000000 --- a/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs +++ /dev/null @@ -1,9 +0,0 @@ -static class MessageBodyIdGenerator -{ - const string CollectionName = "MessageBodies"; - - public static string MakeDocumentId(string messageUniqueId) - { - return $"{CollectionName}/{messageUniqueId}"; - } -} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs index ecc19fb28f..0a01e5a31c 100644 --- a/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs @@ -2,6 +2,7 @@ { using System; using System.Collections.Generic; + using System.IO; using System.Linq; using System.Threading.Tasks; using Editing; @@ -19,17 +20,20 @@ using ServiceControl.MessageFailures; using ServiceControl.MessageFailures.Api; using ServiceControl.Operations; + using ServiceControl.Operations.BodyStorage; using ServiceControl.Persistence.Infrastructure; using ServiceControl.Recoverability; class ErrorMessagesDataStore : IErrorMessageDataStore { readonly IDocumentStore documentStore; + readonly IBodyStorage bodyStorage; readonly TimeSpan eventsRetentionPeriod; - public ErrorMessagesDataStore(IDocumentStore documentStore, RavenDBPersisterSettings settings) + public ErrorMessagesDataStore(IDocumentStore documentStore, IBodyStorage bodyStorage, RavenDBPersisterSettings settings) { this.documentStore = documentStore; + this.bodyStorage = bodyStorage; eventsRetentionPeriod = settings.EventsRetentionPeriod; } @@ -730,13 +734,37 @@ public async Task GetRetryPendingMessages(DateTime from, DateTime to, } } } - return ids.ToArray(); } - public Task FetchFromFailedMessage(string uniqueMessageId) + public async Task FetchFromFailedMessage(string uniqueMessageId) { - throw new NotSupportedException("Body not stored embedded"); + byte[] body = null; + var result = await bodyStorage.TryFetch(uniqueMessageId); + + if (result == null) + { + throw new InvalidOperationException("IBodyStorage.TryFetch result cannot be null"); + } + + if (result.HasResult) + { + using (result.Stream) // Not strictly required for MemoryStream but might be different behavior in future .NET versions + { + // Unfortunately we can't use the buffer manager here yet because core doesn't allow to set the length property so usage of GetBuffer is not possible + // furthermore call ToArray would neglect many of the benefits of the recyclable stream + // RavenDB always returns a memory stream in ver. 3.5 so there is no need to pretend we need to do buffered reads since the memory is anyway fully allocated already + // this assumption might change when we stop supporting RavenDB 3.5 but right now this is the most memory efficient way to do things + // https://github.com/microsoft/Microsoft.IO.RecyclableMemoryStream#getbuffer-and-toarray + using (var memoryStream = new MemoryStream()) + { + await result.Stream.CopyToAsync(memoryStream); + + body = memoryStream.ToArray(); + } + } + } + return body; } public async Task StoreEventLogItem(EventLogItem logItem) diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs index 94bb31a7cd..8acde22ac4 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenAttachmentsBodyStorage.cs @@ -1,14 +1,17 @@ namespace ServiceControl.Operations.BodyStorage.RavenAttachments { + using System; using System.IO; + using System.Linq; using System.Threading.Tasks; using Raven.Client.Documents; - using Raven.Client.Documents.Commands.Batches; - using Sparrow.Json.Parsing; + using Raven.Client.Documents.Session; + using ServiceControl.MessageFailures; + using ServiceControl.MessageFailures.Api; class RavenAttachmentsBodyStorage : IBodyStorage { - const string AttachmentName = "body"; + public const string AttachmentName = "body"; readonly IDocumentStore documentStore; public RavenAttachmentsBodyStorage(IDocumentStore documentStore) @@ -16,28 +19,43 @@ public RavenAttachmentsBodyStorage(IDocumentStore documentStore) this.documentStore = documentStore; } - // TODO: This method is only used in tests and not by ServiceControl itself! But in the Raven3.5 persister, it IS used! - // It should probably be removed and tests should use the RavenDbRecoverabilityIngestionUnitOfWork - public async Task Store(string messageId, string contentType, int bodySize, Stream bodyStream) + public Task Store(string uniqueId, string contentType, int bodySize, Stream bodyStream) + => throw new NotImplementedException("Only included for interface compatibility with Raven3.5 persister implementation. Raven5 tests should use IIngestionUnitOfWorkFactory to store failed messages/bodies."); + + public async Task TryFetch(string bodyId) { - var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId); + using var session = documentStore.OpenAsyncSession(); + + // BodyId could be a MessageID or a UniqueID, but if a UniqueID then it will be a DeterministicGuid of MessageID and endpoint name and be Guid-parseable + // This is preferred, then we know we're getting the correct message body that is attached to the FailedMessage document + if (Guid.TryParse(bodyId, out _)) + { + var result = await ResultForUniqueId(session, bodyId); + if (result != null) + { + return result; + } + } - var emptyDoc = new DynamicJsonValue(); - var putOwnerDocumentCmd = new PutCommandData(documentId, null, emptyDoc); + // See if we can look up a FailedMessage by MessageID + var query = session.Query() + .Where(msg => msg.MessageId == bodyId, true) + .OfType() + .Select(msg => msg.UniqueMessageId); - var stream = bodyStream; - var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null); + var uniqueId = await query.FirstOrDefaultAsync(); - using var session = documentStore.OpenAsyncSession(); - session.Advanced.Defer(new ICommandData[] { putOwnerDocumentCmd, putAttachmentCmd }); - await session.SaveChangesAsync(); + if (uniqueId != null) + { + return await ResultForUniqueId(session, uniqueId); + } + + return null; } - public async Task TryFetch(string messageId) + async Task ResultForUniqueId(IAsyncDocumentSession session, string uniqueId) { - var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId); - - using var session = documentStore.OpenAsyncSession(); + var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); var result = await session.Advanced.Attachments.GetAsync(documentId, AttachmentName); diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs index 5f4e33a76c..b01120e995 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDBPersisterSettings.cs @@ -27,4 +27,6 @@ class RavenDBPersisterSettings : PersistenceSettings public const int DatabaseMaintenancePortDefault = 33334; public const int ExpirationProcessTimerInSecondsDefault = 600; public const string LogsModeDefault = "Information"; + + public override bool MessageBodiesAlwaysStoredInFailedMessage => true; } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs index abf7bffcc9..0e2d4e201f 100644 --- a/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs +++ b/src/ServiceControl.Persistence.RavenDb5/RavenDbPersistence.cs @@ -30,6 +30,7 @@ public void Configure(IServiceCollection serviceCollection) return; } + serviceCollection.AddSingleton(settings); serviceCollection.AddSingleton(settings); serviceCollection.AddSingleton(); diff --git a/src/ServiceControl.Persistence.RavenDb5/UnitOfWork/RavenDbRecoverabilityIngestionUnitOfWork.cs b/src/ServiceControl.Persistence.RavenDb5/UnitOfWork/RavenDbRecoverabilityIngestionUnitOfWork.cs index 63152ec2fd..039021d81b 100644 --- a/src/ServiceControl.Persistence.RavenDb5/UnitOfWork/RavenDbRecoverabilityIngestionUnitOfWork.cs +++ b/src/ServiceControl.Persistence.RavenDb5/UnitOfWork/RavenDbRecoverabilityIngestionUnitOfWork.cs @@ -13,7 +13,6 @@ using ServiceControl.Persistence.Infrastructure; using ServiceControl.Persistence.UnitOfWork; using ServiceControl.Recoverability; - using Sparrow.Json.Parsing; class RavenDbRecoverabilityIngestionUnitOfWork : IRecoverabilityIngestionUnitOfWork { @@ -32,13 +31,12 @@ public Task RecordFailedProcessingAttempt( List groups) { var uniqueMessageId = context.Headers.UniqueId(); - var bodyId = processingAttempt.Headers.MessageId(); var contentType = GetContentType(context.Headers, "text/xml"); var bodySize = context.Body?.Length ?? 0; processingAttempt.MessageMetadata.Add("ContentType", contentType); processingAttempt.MessageMetadata.Add("ContentLength", bodySize); - processingAttempt.MessageMetadata.Add("BodyUrl", $"/messages/{bodyId}/body"); + processingAttempt.MessageMetadata.Add("BodyUrl", $"/messages/{uniqueMessageId}/body"); if (doFullTextIndexing) { @@ -143,22 +141,12 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess void AddStoreBodyCommands(MessageContext context, string contentType) { - var messageId = context.Headers.MessageId(); - var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId); - - var emptyDoc = new DynamicJsonValue - { - ["@metadata"] = new DynamicJsonValue - { - ["@collection"] = "MessageBodies" - } - }; - var putOwnerDocumentCmd = new PutCommandData(documentId, null, emptyDoc); + var uniqueId = context.Headers.UniqueId(); + var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); var stream = Memory.Manager.GetStream(context.Body); var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null); - parentUnitOfWork.AddCommand(putOwnerDocumentCmd); parentUnitOfWork.AddCommand(putAttachmentCmd); } diff --git a/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs b/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs index ad71aa9658..18ef6b0983 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb/TestPersistenceImpl.cs @@ -14,10 +14,9 @@ sealed class TestPersistenceImpl : TestPersistence { - readonly RavenDBPersisterSettings settings = CreateSettings(); IDocumentStore documentStore; - static RavenDBPersisterSettings CreateSettings() + public TestPersistenceImpl() { var retentionPeriod = TimeSpan.FromMinutes(1); @@ -36,12 +35,12 @@ static RavenDBPersisterSettings CreateSettings() settings.ExposeRavenDB = true; } - return settings; + Settings = settings; } public override void Configure(IServiceCollection services) { - var persistence = new RavenDbPersistenceConfiguration().Create(CreateSettings()); + var persistence = new RavenDbPersistenceConfiguration().Create(Settings); PersistenceHostBuilderExtensions.CreatePersisterLifecyle(services, persistence); services.AddHostedService(p => new Wrapper(this, p.GetRequiredService())); } @@ -70,7 +69,8 @@ public override void BlockToInspectDatabase() return; } - var url = $"http://localhost:{settings.DatabaseMaintenancePort}/studio/index.html#databases/documents?&database=%3Csystem%3E"; + var databaseMaintenanceUrl = ((RavenDBPersisterSettings)Settings).DatabaseMaintenanceUrl; + var url = databaseMaintenanceUrl + $"/studio/index.html#databases/documents?&database=%3Csystem%3E"; if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { @@ -99,4 +99,4 @@ public override Task TearDown() return Task.CompletedTask; } } -} \ No newline at end of file +} diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs b/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs index a087ef29ef..0cc85d9a12 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/BodyStorage/RavenAttachmentsBodyStorageTests.cs @@ -1,23 +1,82 @@ namespace ServiceControl.UnitTests.BodyStorage { using System; - using System.IO; + using System.Collections.Generic; + using System.Threading; using System.Threading.Tasks; + using NServiceBus; + using NServiceBus.Transport; using NUnit.Framework; + using ServiceControl.MessageFailures; + using ServiceControl.Operations; + using ServiceControl.Persistence.UnitOfWork; [TestFixture] sealed class RavenAttachmentsBodyStorageTests : PersistenceTestBase { [Test] - public async Task Attachments_with_ids_that_contain_backslash_should_be_readable() + public async Task QueryByUniqueId() { + await RunTest(headers => headers.UniqueId()); + } + + [Test] + public async Task QueryByMessageId() + { + await RunTest(headers => headers.MessageId()); + } + + async Task RunTest(Func, string> getIdToQuery) + { + // Contains a backslash, like an old MSMQ message id, to ensure that message ids like this are usable var messageId = "3f0240a7-9b2e-4e2a-ab39-6114932adad1\\2055783"; var contentType = "NotImportant"; + var endpointName = "EndpointName"; var body = BitConverter.GetBytes(0xDEADBEEF); + var ingestionFactory = GetRequiredService(); + + var headers = new Dictionary + { + [Headers.MessageId] = messageId, + [Headers.ProcessingEndpoint] = endpointName, + [Headers.ContentType] = contentType + }; + + using (var cancellationSource = new CancellationTokenSource()) + using (var uow = await ingestionFactory.StartNew()) + { + var context = new MessageContext(messageId, headers, body, new TransportTransaction(), cancellationSource, new NServiceBus.Extensibility.ContextBag()); + var processingAttempt = new FailedMessage.ProcessingAttempt + { + MessageId = messageId, + MessageMetadata = new Dictionary + { + ["MessageId"] = messageId, + ["TimeSent"] = DateTime.UtcNow, + ["ReceivingEndpoint"] = new EndpointDetails + { + Name = endpointName, + Host = "Host", + HostId = Guid.NewGuid() + } + }, + FailureDetails = new Contracts.Operations.FailureDetails + { + AddressOfFailingEndpoint = endpointName + }, + Headers = headers + }; + var groups = new List(); + + await uow.Recoverability.RecordFailedProcessingAttempt(context, processingAttempt, groups); + await uow.Complete(); + } + + CompleteDatabaseOperation(); - await BodyStorage.Store(messageId, contentType, body.Length, new MemoryStream(body)); + var fetchById = getIdToQuery(headers); - var retrieved = await BodyStorage.TryFetch(messageId); + var retrieved = await BodyStorage.TryFetch(fetchById); Assert.IsNotNull(retrieved); Assert.True(retrieved.HasResult); Assert.AreEqual(contentType, retrieved.ContentType); diff --git a/src/ServiceControl.Persistence.Tests.RavenDb5/TestPersistenceImpl.cs b/src/ServiceControl.Persistence.Tests.RavenDb5/TestPersistenceImpl.cs index 86d9fce004..d7c6113d4f 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDb5/TestPersistenceImpl.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDb5/TestPersistenceImpl.cs @@ -17,7 +17,6 @@ sealed class TestPersistenceImpl : TestPersistence { readonly string databaseName; - readonly RavenDBPersisterSettings settings; IDocumentStore documentStore; public TestPersistenceImpl() @@ -27,7 +26,7 @@ public TestPersistenceImpl() TestContext.Out.WriteLine($"Test Database Name: {databaseName}"); - settings = new RavenDBPersisterSettings + Settings = new RavenDBPersisterSettings { AuditRetentionPeriod = retentionPeriod, ErrorRetentionPeriod = retentionPeriod, @@ -39,7 +38,7 @@ public TestPersistenceImpl() public override void Configure(IServiceCollection services) { - var persistence = new RavenDbPersistenceConfiguration().Create(settings); + var persistence = new RavenDbPersistenceConfiguration().Create(Settings); PersistenceHostBuilderExtensions.CreatePersisterLifecyle(services, persistence); services.AddHostedService(p => new FakeServiceToExtractDocumentStore(this, p.GetRequiredService())); } diff --git a/src/ServiceControl.Persistence.Tests/EnsureSettingsInContainer.cs b/src/ServiceControl.Persistence.Tests/EnsureSettingsInContainer.cs new file mode 100644 index 0000000000..9773b566ea --- /dev/null +++ b/src/ServiceControl.Persistence.Tests/EnsureSettingsInContainer.cs @@ -0,0 +1,22 @@ +namespace ServiceControl.PersistenceTests +{ + using NUnit.Framework; + using ServiceControl.Persistence; + + public sealed class EnsureSettingsInContainer : PersistenceTestBase + { + [Test] + public void CheckForBothTypes() + { + // Persistence implementation must register singleton as base type as some compoennts need to inject that + var baseSettings = GetRequiredService(); + + var actualType = baseSettings.GetType(); + Assert.AreNotEqual(actualType, typeof(PersistenceSettings)); + + // Persistence implementation must also register the same singleton as the persister-specific type + var settingsAsActualType = GetRequiredService(actualType); + Assert.AreSame(baseSettings, settingsAsActualType); + } + } +} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs b/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs index cf247993f0..89fc75d67d 100644 --- a/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs +++ b/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs @@ -56,7 +56,10 @@ public async Task TearDown() host.Dispose(); } + protected PersistenceSettings PersistenceSettings => testPersistence.Settings; + protected T GetRequiredService() => host.Services.GetRequiredService(); + protected object GetRequiredService(Type serviceType) => host.Services.GetRequiredService(serviceType); protected Action RegisterServices { get; set; } diff --git a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs index 593bb12f37..08e49a6cc5 100644 --- a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs +++ b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs @@ -59,7 +59,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte await CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, "Test-group", true, 2001); var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); // Needs index RetryBatches_ByStatus_ReduceInitialBatchSize CompleteDatabaseOperation(); @@ -73,7 +73,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte await documentManager.RebuildRetryOperationState(); - processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager); + processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); await processor.ProcessBatches(sender); @@ -92,7 +92,7 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed() var sender = new TestSender(); var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, PersistenceSettings); await processor.ProcessBatches(sender); // mark ready await processor.ProcessBatches(sender); @@ -122,7 +122,7 @@ public async Task When_there_is_one_poison_message_it_is_removed_from_batch_and_ }; var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(BodyStorage, ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, PersistenceSettings); bool c; do @@ -159,7 +159,7 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_ var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager, PersistenceSettings); CompleteDatabaseOperation(); diff --git a/src/ServiceControl.Persistence.Tests/TestPersistence.cs b/src/ServiceControl.Persistence.Tests/TestPersistence.cs index 85208e3799..71f18f291c 100644 --- a/src/ServiceControl.Persistence.Tests/TestPersistence.cs +++ b/src/ServiceControl.Persistence.Tests/TestPersistence.cs @@ -3,9 +3,12 @@ using System.Diagnostics; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; + using ServiceControl.Persistence; abstract class TestPersistence { + public PersistenceSettings Settings { get; protected set; } + public abstract void Configure(IServiceCollection services); public abstract void CompleteDatabaseOperation(); diff --git a/src/ServiceControl.Persistence/PersistenceSettings.cs b/src/ServiceControl.Persistence/PersistenceSettings.cs index 352046ec62..ab621c7a5c 100644 --- a/src/ServiceControl.Persistence/PersistenceSettings.cs +++ b/src/ServiceControl.Persistence/PersistenceSettings.cs @@ -15,5 +15,6 @@ public abstract class PersistenceSettings public TimeSpan? OverrideCustomCheckRepeatTime { get; set; } + public abstract bool MessageBodiesAlwaysStoredInFailedMessage { get; } } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs b/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs index 84a7fbe6df..b15f3bee08 100644 --- a/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs +++ b/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs @@ -17,13 +17,14 @@ namespace ServiceControl.Recoverability class RetryProcessor { - public RetryProcessor(IRetryBatchesDataStore store, IDomainEvents domainEvents, ReturnToSenderDequeuer returnToSender, RetryingManager retryingManager) + public RetryProcessor(IRetryBatchesDataStore store, IDomainEvents domainEvents, ReturnToSenderDequeuer returnToSender, RetryingManager retryingManager, PersistenceSettings settings) { this.store = store; this.returnToSender = returnToSender; this.retryingManager = retryingManager; this.domainEvents = domainEvents; corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName); + messageBodiesAlwaysStoredInFailedMessage = settings.MessageBodiesAlwaysStoredInFailedMessage; } Task Enqueue(IDispatchMessages sender, TransportOperations outgoingMessages) @@ -332,7 +333,7 @@ TransportOperation ToTransportOperation(FailedMessage message, string stagingId) headersToRetryWith["ServiceControl.Retry.UniqueMessageId"] = message.UniqueMessageId; headersToRetryWith["ServiceControl.Retry.StagingId"] = stagingId; headersToRetryWith["ServiceControl.Retry.Attempt.MessageId"] = attempt.MessageId; - if (attempt.MessageMetadata.ContainsKey("Body") || attempt.Body != null) + if (messageBodiesAlwaysStoredInFailedMessage || attempt.MessageMetadata.ContainsKey("Body") || attempt.Body != null) { headersToRetryWith["ServiceControl.Retry.BodyOnFailedMessage"] = null; } @@ -343,10 +344,12 @@ TransportOperation ToTransportOperation(FailedMessage message, string stagingId) return new TransportOperation(transportMessage, new UnicastAddressTag(returnToSender.InputAddress)); } - IDomainEvents domainEvents; - IRetryBatchesDataStore store; - ReturnToSenderDequeuer returnToSender; - RetryingManager retryingManager; + readonly IDomainEvents domainEvents; + readonly IRetryBatchesDataStore store; + readonly ReturnToSenderDequeuer returnToSender; + readonly RetryingManager retryingManager; + readonly bool messageBodiesAlwaysStoredInFailedMessage; + MessageRedirectsCollection redirects; bool isRecoveringFromPrematureShutdown = true; CorruptedReplyToHeaderStrategy corruptedReplyToHeaderStrategy;