Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ class RavenDBPersisterSettings : PersistenceSettings
public TimeSpan? AuditRetentionPeriod { get; set; }
public int ExternalIntegrationsDispatchingBatchSize { get; set; } = 100;

public override bool MessageBodiesAlwaysStoredInFailedMessage => false;

public const int DatabaseMaintenancePortDefault = 33334;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public void Configure(IServiceCollection serviceCollection)
return;
}

serviceCollection.AddSingleton<PersistenceSettings>(settings);
serviceCollection.AddSingleton(settings);
serviceCollection.AddSingleton<IDocumentStore>(documentStore);

Expand Down

This file was deleted.

36 changes: 32 additions & 4 deletions src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Editing;
Expand All @@ -19,17 +20,20 @@
using ServiceControl.MessageFailures;
using ServiceControl.MessageFailures.Api;
using ServiceControl.Operations;
using ServiceControl.Operations.BodyStorage;
using ServiceControl.Persistence.Infrastructure;
using ServiceControl.Recoverability;

class ErrorMessagesDataStore : IErrorMessageDataStore
{
readonly IDocumentStore documentStore;
readonly IBodyStorage bodyStorage;
readonly TimeSpan eventsRetentionPeriod;

public ErrorMessagesDataStore(IDocumentStore documentStore, RavenDBPersisterSettings settings)
public ErrorMessagesDataStore(IDocumentStore documentStore, IBodyStorage bodyStorage, RavenDBPersisterSettings settings)
{
this.documentStore = documentStore;
this.bodyStorage = bodyStorage;
eventsRetentionPeriod = settings.EventsRetentionPeriod;
}

Expand Down Expand Up @@ -730,13 +734,37 @@ public async Task<string[]> GetRetryPendingMessages(DateTime from, DateTime to,
}
}
}

return ids.ToArray();
}

public Task<byte[]> FetchFromFailedMessage(string uniqueMessageId)
public async Task<byte[]> FetchFromFailedMessage(string uniqueMessageId)
{
throw new NotSupportedException("Body not stored embedded");
byte[] body = null;
var result = await bodyStorage.TryFetch(uniqueMessageId);

if (result == null)
{
throw new InvalidOperationException("IBodyStorage.TryFetch result cannot be null");
}

if (result.HasResult)
{
using (result.Stream) // Not strictly required for MemoryStream but might be different behavior in future .NET versions
{
// Unfortunately we can't use the buffer manager here yet because core doesn't allow to set the length property so usage of GetBuffer is not possible
// furthermore call ToArray would neglect many of the benefits of the recyclable stream
// RavenDB always returns a memory stream in ver. 3.5 so there is no need to pretend we need to do buffered reads since the memory is anyway fully allocated already
// this assumption might change when we stop supporting RavenDB 3.5 but right now this is the most memory efficient way to do things
// https://github.com/microsoft/Microsoft.IO.RecyclableMemoryStream#getbuffer-and-toarray
using (var memoryStream = new MemoryStream())
{
await result.Stream.CopyToAsync(memoryStream);

body = memoryStream.ToArray();
}
}
}
return body;
}

public async Task StoreEventLogItem(EventLogItem logItem)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,61 @@
namespace ServiceControl.Operations.BodyStorage.RavenAttachments
{
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Raven.Client.Documents;
using Raven.Client.Documents.Commands.Batches;
using Sparrow.Json.Parsing;
using Raven.Client.Documents.Session;
using ServiceControl.MessageFailures;
using ServiceControl.MessageFailures.Api;

class RavenAttachmentsBodyStorage : IBodyStorage
{
const string AttachmentName = "body";
public const string AttachmentName = "body";
readonly IDocumentStore documentStore;

public RavenAttachmentsBodyStorage(IDocumentStore documentStore)
{
this.documentStore = documentStore;
}

// TODO: This method is only used in tests and not by ServiceControl itself! But in the Raven3.5 persister, it IS used!
// It should probably be removed and tests should use the RavenDbRecoverabilityIngestionUnitOfWork
public async Task Store(string messageId, string contentType, int bodySize, Stream bodyStream)
public Task Store(string uniqueId, string contentType, int bodySize, Stream bodyStream)
=> throw new NotImplementedException("Only included for interface compatibility with Raven3.5 persister implementation. Raven5 tests should use IIngestionUnitOfWorkFactory to store failed messages/bodies.");

public async Task<MessageBodyStreamResult> TryFetch(string bodyId)
{
var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId);
using var session = documentStore.OpenAsyncSession();

// BodyId could be a MessageID or a UniqueID, but if a UniqueID then it will be a DeterministicGuid of MessageID and endpoint name and be Guid-parseable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can it be a message ID or unique ID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in my redesign, we're always storing it using the unique ID, and the message metadata will say "look at messages/{uniqueId}/body for the body" so in theory, it should always be the unique ID. But previous versions stored the body only by message ID and while in theory, Pulse and Insight should pay attention to the message metadata that points to the uniqueID-based URL, it's impossible for me to say that assumption hasn't leaked into one of those apps. So just following what the API looks like, it's entirely reasonable some client somewhere could try to access by message ID.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we test SI + SP and they use the BodyUrl we could simplify this and only use Unique ID?

// This is preferred, then we know we're getting the correct message body that is attached to the FailedMessage document
if (Guid.TryParse(bodyId, out _))
{
var result = await ResultForUniqueId(session, bodyId);
if (result != null)
{
return result;
}
}

var emptyDoc = new DynamicJsonValue();
var putOwnerDocumentCmd = new PutCommandData(documentId, null, emptyDoc);
// See if we can look up a FailedMessage by MessageID
var query = session.Query<FailedMessageViewIndex.SortAndFilterOptions, FailedMessageViewIndex>()
.Where(msg => msg.MessageId == bodyId, true)
.OfType<FailedMessage>()
.Select(msg => msg.UniqueMessageId);

var stream = bodyStream;
var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null);
var uniqueId = await query.FirstOrDefaultAsync();

using var session = documentStore.OpenAsyncSession();
session.Advanced.Defer(new ICommandData[] { putOwnerDocumentCmd, putAttachmentCmd });
await session.SaveChangesAsync();
if (uniqueId != null)
{
return await ResultForUniqueId(session, uniqueId);
}

return null;
}

public async Task<MessageBodyStreamResult> TryFetch(string messageId)
async Task<MessageBodyStreamResult> ResultForUniqueId(IAsyncDocumentSession session, string uniqueId)
{
var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId);

using var session = documentStore.OpenAsyncSession();
var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId);

var result = await session.Advanced.Attachments.GetAsync(documentId, AttachmentName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ class RavenDBPersisterSettings : PersistenceSettings
public const int DatabaseMaintenancePortDefault = 33334;
public const int ExpirationProcessTimerInSecondsDefault = 600;
public const string LogsModeDefault = "Information";

public override bool MessageBodiesAlwaysStoredInFailedMessage => true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public void Configure(IServiceCollection serviceCollection)
return;
}

serviceCollection.AddSingleton<PersistenceSettings>(settings);
serviceCollection.AddSingleton(settings);

serviceCollection.AddSingleton<IServiceControlSubscriptionStorage, RavenDbSubscriptionStorage>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using ServiceControl.Persistence.Infrastructure;
using ServiceControl.Persistence.UnitOfWork;
using ServiceControl.Recoverability;
using Sparrow.Json.Parsing;

class RavenDbRecoverabilityIngestionUnitOfWork : IRecoverabilityIngestionUnitOfWork
{
Expand All @@ -32,13 +31,12 @@ public Task RecordFailedProcessingAttempt(
List<FailedMessage.FailureGroup> groups)
{
var uniqueMessageId = context.Headers.UniqueId();
var bodyId = processingAttempt.Headers.MessageId();
var contentType = GetContentType(context.Headers, "text/xml");
var bodySize = context.Body?.Length ?? 0;

processingAttempt.MessageMetadata.Add("ContentType", contentType);
processingAttempt.MessageMetadata.Add("ContentLength", bodySize);
processingAttempt.MessageMetadata.Add("BodyUrl", $"/messages/{bodyId}/body");
processingAttempt.MessageMetadata.Add("BodyUrl", $"/messages/{uniqueMessageId}/body");

if (doFullTextIndexing)
{
Expand Down Expand Up @@ -143,22 +141,12 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess

void AddStoreBodyCommands(MessageContext context, string contentType)
{
var messageId = context.Headers.MessageId();
var documentId = MessageBodyIdGenerator.MakeDocumentId(messageId);

var emptyDoc = new DynamicJsonValue
{
["@metadata"] = new DynamicJsonValue
{
["@collection"] = "MessageBodies"
}
};
var putOwnerDocumentCmd = new PutCommandData(documentId, null, emptyDoc);
var uniqueId = context.Headers.UniqueId();
var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId);

var stream = Memory.Manager.GetStream(context.Body);
var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null);

parentUnitOfWork.AddCommand(putOwnerDocumentCmd);
parentUnitOfWork.AddCommand(putAttachmentCmd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@

sealed class TestPersistenceImpl : TestPersistence
{
readonly RavenDBPersisterSettings settings = CreateSettings();
IDocumentStore documentStore;

static RavenDBPersisterSettings CreateSettings()
public TestPersistenceImpl()
{
var retentionPeriod = TimeSpan.FromMinutes(1);

Expand All @@ -36,12 +35,12 @@ static RavenDBPersisterSettings CreateSettings()
settings.ExposeRavenDB = true;
}

return settings;
Settings = settings;
}

public override void Configure(IServiceCollection services)
{
var persistence = new RavenDbPersistenceConfiguration().Create(CreateSettings());
var persistence = new RavenDbPersistenceConfiguration().Create(Settings);
PersistenceHostBuilderExtensions.CreatePersisterLifecyle(services, persistence);
services.AddHostedService(p => new Wrapper(this, p.GetRequiredService<IDocumentStore>()));
}
Expand Down Expand Up @@ -70,7 +69,8 @@ public override void BlockToInspectDatabase()
return;
}

var url = $"http://localhost:{settings.DatabaseMaintenancePort}/studio/index.html#databases/documents?&database=%3Csystem%3E";
var databaseMaintenanceUrl = ((RavenDBPersisterSettings)Settings).DatabaseMaintenanceUrl;
var url = databaseMaintenanceUrl + $"/studio/index.html#databases/documents?&database=%3Csystem%3E";

if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
Expand Down Expand Up @@ -99,4 +99,4 @@ public override Task TearDown()
return Task.CompletedTask;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,82 @@
namespace ServiceControl.UnitTests.BodyStorage
{
using System;
using System.IO;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Transport;
using NUnit.Framework;
using ServiceControl.MessageFailures;
using ServiceControl.Operations;
using ServiceControl.Persistence.UnitOfWork;

[TestFixture]
sealed class RavenAttachmentsBodyStorageTests : PersistenceTestBase
{
[Test]
public async Task Attachments_with_ids_that_contain_backslash_should_be_readable()
public async Task QueryByUniqueId()
{
await RunTest(headers => headers.UniqueId());
}

[Test]
public async Task QueryByMessageId()
{
await RunTest(headers => headers.MessageId());
}

async Task RunTest(Func<Dictionary<string, string>, string> getIdToQuery)
{
// Contains a backslash, like an old MSMQ message id, to ensure that message ids like this are usable
var messageId = "3f0240a7-9b2e-4e2a-ab39-6114932adad1\\2055783";
var contentType = "NotImportant";
var endpointName = "EndpointName";
var body = BitConverter.GetBytes(0xDEADBEEF);
var ingestionFactory = GetRequiredService<IIngestionUnitOfWorkFactory>();

var headers = new Dictionary<string, string>
{
[Headers.MessageId] = messageId,
[Headers.ProcessingEndpoint] = endpointName,
[Headers.ContentType] = contentType
};

using (var cancellationSource = new CancellationTokenSource())
using (var uow = await ingestionFactory.StartNew())
{
var context = new MessageContext(messageId, headers, body, new TransportTransaction(), cancellationSource, new NServiceBus.Extensibility.ContextBag());
var processingAttempt = new FailedMessage.ProcessingAttempt
{
MessageId = messageId,
MessageMetadata = new Dictionary<string, object>
{
["MessageId"] = messageId,
["TimeSent"] = DateTime.UtcNow,
["ReceivingEndpoint"] = new EndpointDetails
{
Name = endpointName,
Host = "Host",
HostId = Guid.NewGuid()
}
},
FailureDetails = new Contracts.Operations.FailureDetails
{
AddressOfFailingEndpoint = endpointName
},
Headers = headers
};
var groups = new List<FailedMessage.FailureGroup>();

await uow.Recoverability.RecordFailedProcessingAttempt(context, processingAttempt, groups);
await uow.Complete();
}

CompleteDatabaseOperation();

await BodyStorage.Store(messageId, contentType, body.Length, new MemoryStream(body));
var fetchById = getIdToQuery(headers);

var retrieved = await BodyStorage.TryFetch(messageId);
var retrieved = await BodyStorage.TryFetch(fetchById);
Assert.IsNotNull(retrieved);
Assert.True(retrieved.HasResult);
Assert.AreEqual(contentType, retrieved.ContentType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ sealed class TestPersistenceImpl : TestPersistence
{
readonly string databaseName;

readonly RavenDBPersisterSettings settings;
IDocumentStore documentStore;

public TestPersistenceImpl()
Expand All @@ -27,7 +26,7 @@ public TestPersistenceImpl()

TestContext.Out.WriteLine($"Test Database Name: {databaseName}");

settings = new RavenDBPersisterSettings
Settings = new RavenDBPersisterSettings
{
AuditRetentionPeriod = retentionPeriod,
ErrorRetentionPeriod = retentionPeriod,
Expand All @@ -39,7 +38,7 @@ public TestPersistenceImpl()

public override void Configure(IServiceCollection services)
{
var persistence = new RavenDbPersistenceConfiguration().Create(settings);
var persistence = new RavenDbPersistenceConfiguration().Create(Settings);
PersistenceHostBuilderExtensions.CreatePersisterLifecyle(services, persistence);
services.AddHostedService(p => new FakeServiceToExtractDocumentStore(this, p.GetRequiredService<IDocumentStore>()));
}
Expand Down
Loading