Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/ServiceControl.Persistence.RavenDb/RetryBatchesManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,11 @@ public async Task<MessageRedirectsCollection> GetOrCreateMessageRedirectsCollect

return new MessageRedirectsCollection();
}

public Task CancelExpiration(FailedMessage failedMessage)
{
// Using expiration query/delete logic for RavenDB 3.5 implementation
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
using ServiceControl.MessageFailures;
using ServiceControl.Persistence.Recoverability.Editing;
using Raven.Client.Documents.Session;
using RavenDb5;

class EditFailedMessageManager : AbstractSessionManager, IEditFailedMessagesManager
{
readonly IAsyncDocumentSession session;
readonly ExpirationManager expirationManager;
FailedMessage failedMessage;

public EditFailedMessageManager(IAsyncDocumentSession session)
public EditFailedMessageManager(IAsyncDocumentSession session, ExpirationManager expirationManager)
: base(session)
{
this.session = session;
this.expirationManager = expirationManager;
}

public async Task<FailedMessage> GetFailedMessage(string failedMessageId)
Expand Down Expand Up @@ -47,6 +50,9 @@ public Task SetFailedMessageAsResolved()
{
// Instance is tracked by the document session
failedMessage.Status = FailedMessageStatus.Resolved;

expirationManager.EnableExpiration(session, failedMessage);

return Task.CompletedTask;
}
}
Expand Down
21 changes: 14 additions & 7 deletions src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Raven.Client.Documents.Queries;
using Raven.Client.Documents.Queries.Facets;
using Raven.Client.Documents.Session;
using RavenDb5;
using ServiceControl.CompositeViews.Messages;
using ServiceControl.EventLog;
using ServiceControl.MessageFailures;
Expand All @@ -28,13 +29,13 @@ class ErrorMessagesDataStore : IErrorMessageDataStore
{
readonly IDocumentStore documentStore;
readonly IBodyStorage bodyStorage;
readonly TimeSpan eventsRetentionPeriod;
readonly ExpirationManager expirationManager;

public ErrorMessagesDataStore(IDocumentStore documentStore, IBodyStorage bodyStorage, RavenDBPersisterSettings settings)
public ErrorMessagesDataStore(IDocumentStore documentStore, IBodyStorage bodyStorage, ExpirationManager expirationManager)
{
this.documentStore = documentStore;
this.bodyStorage = bodyStorage;
eventsRetentionPeriod = settings.EventsRetentionPeriod;
this.expirationManager = expirationManager;
}

public async Task<QueryResult<IList<MessagesView>>> GetAllMessages(
Expand Down Expand Up @@ -171,6 +172,8 @@ public async Task FailedMessageMarkAsArchived(string failedMessageId)
if (failedMessage.Status != FailedMessageStatus.Archived)
{
failedMessage.Status = FailedMessageStatus.Archived;

expirationManager.EnableExpiration(session, failedMessage);
}

await session.SaveChangesAsync();
Expand Down Expand Up @@ -200,7 +203,7 @@ public async Task StoreFailedErrorImport(FailedErrorImport failure)
public Task<IEditFailedMessagesManager> CreateEditFailedMessageManager()
{
var session = documentStore.OpenAsyncSession();
var manager = new EditFailedMessageManager(session);
var manager = new EditFailedMessageManager(session, expirationManager);
return Task.FromResult((IEditFailedMessagesManager)manager);
}

Expand Down Expand Up @@ -539,6 +542,8 @@ public async Task<bool> MarkMessageAsResolved(string failedMessageId)

failedMessage.Status = FailedMessageStatus.Resolved;

expirationManager.EnableExpiration(session, failedMessage);

await session.SaveChangesAsync();

return true;
Expand Down Expand Up @@ -586,12 +591,15 @@ class DocumentPatchResult
// TODO: Make sure this new implementation actually works, not going to delete the old implementation (commented below) until then
var patch = new PatchByQueryOperation(new IndexQuery
{
// https://ravendb.net/docs/article-page/5.4/Csharp/client-api/operations/patching/single-document#remove-property

Query = $@"from index '{new FailedMessageViewIndex().IndexName} as msg
where msg.LastModified >= args.From and msg.LastModified <= args.To
where msg.Status == args.Archived
update
{{
msg.Status = args.Unresolved
{ExpirationManager.DeleteExpirationFieldScript}
}}",
QueryParameters =
{
Expand Down Expand Up @@ -668,6 +676,7 @@ class DocumentPatchResult
if (failedMessage.Status == FailedMessageStatus.Archived)
{
failedMessage.Status = FailedMessageStatus.Unresolved;
session.Advanced.GetMetadataFor(failedMessage).Remove(Constants.Documents.Metadata.Expires);
}
}

Expand Down Expand Up @@ -769,13 +778,11 @@ public async Task<byte[]> FetchFromFailedMessage(string uniqueMessageId)

public async Task StoreEventLogItem(EventLogItem logItem)
{
var expiration = DateTime.UtcNow + eventsRetentionPeriod;

using (var session = documentStore.OpenAsyncSession())
{
await session.StoreAsync(logItem);

session.Advanced.GetMetadataFor(logItem)[Constants.Documents.Metadata.Expires] = expiration;
expirationManager.EnableExpiration(session, logItem);

await session.SaveChangesAsync();
}
Expand Down
52 changes: 52 additions & 0 deletions src/ServiceControl.Persistence.RavenDb5/ExpirationManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace ServiceControl.Persistence.RavenDb5
{
using System;
using EventLog;
using Raven.Client;
using Raven.Client.Documents.Operations;
using Raven.Client.Documents.Session;
using FailedMessage = MessageFailures.FailedMessage;

class ExpirationManager
{
public const string DeleteExpirationFieldScript = "; delete msg['@metadata']['@expires']";

readonly TimeSpan errorRetentionPeriod;
readonly TimeSpan eventsRetentionPeriod;

public ExpirationManager(RavenDBPersisterSettings settings)
{
errorRetentionPeriod = settings.ErrorRetentionPeriod;
eventsRetentionPeriod = settings.EventsRetentionPeriod;
}

public void CancelExpiration(IAsyncDocumentSession session, FailedMessage failedMessage)
{
session.Advanced.GetMetadataFor(failedMessage).Remove(Constants.Documents.Metadata.Expires);
}

public void EnableExpiration(IAsyncDocumentSession session, FailedMessage failedMessage)
{
var expiresAt = DateTime.UtcNow + errorRetentionPeriod;

session.Advanced.GetMetadataFor(failedMessage)[Constants.Documents.Metadata.Expires] = expiresAt;
}

public void EnableExpiration(IAsyncDocumentSession session, EventLogItem eventLogItem)
{
var expiresAt = DateTime.UtcNow + eventsRetentionPeriod;

session.Advanced.GetMetadataFor(eventLogItem)[Constants.Documents.Metadata.Expires] = expiresAt;
}

public void EnableExpiration(PatchRequest request)
{
var expiredAt = DateTime.UtcNow + errorRetentionPeriod;

request.Script += "\nthis['@metadata']['@expires'] = args.Expires;";
request.Values.Add("Expires", expiredAt);
}

public void CancelExpiration(PatchRequest request) => request.Script += "delete this['@metadata']['@expires']";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void Configure(IServiceCollection serviceCollection)
serviceCollection.AddSingleton<IMonitoringDataStore, RavenDbMonitoringDataStore>();
serviceCollection.AddSingleton<ICustomChecksDataStore, RavenDbCustomCheckDataStore>();
serviceCollection.AddUnitOfWorkFactory<RavenDbIngestionUnitOfWorkFactory>();
serviceCollection.AddSingleton<ExpirationManager>();
serviceCollection.AddSingleton<MinimumRequiredStorageState>();
serviceCollection.AddSingleton<IBodyStorage, RavenAttachmentsBodyStorage>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@
using System.Threading.Tasks;
using MessageFailures;
using NServiceBus.Logging;
using Persistence.RavenDb5;
using Raven.Client.Documents;
using Raven.Client.Documents.Commands.Batches;
using Raven.Client.Documents.Operations;
using Raven.Client.Documents.Session;

class ArchiveDocumentManager
{
readonly ExpirationManager expirationManager;

public ArchiveDocumentManager(ExpirationManager expirationManager)
{
this.expirationManager = expirationManager;
}

public Task<ArchiveOperation> LoadArchiveOperation(IAsyncDocumentSession session, string groupId, ArchiveType archiveType)
{
return session.LoadAsync<ArchiveOperation>(ArchiveOperation.MakeId(groupId, archiveType));
Expand Down Expand Up @@ -95,6 +103,17 @@ public async Task<GroupDetails> GetGroupDetails(IAsyncDocumentSession session, s

public void ArchiveMessageGroupBatch(IAsyncDocumentSession session, ArchiveBatch batch)
{
var patchRequest = new PatchRequest
{
Script = "this.Status = args.Status;",
Values =
{
{ "Status", (int)FailedMessageStatus.Archived }
}
};

expirationManager.EnableExpiration(patchRequest);

var patchCommands = batch?.DocumentIds.Select(documentId => new PatchCommandData(documentId, null, patchRequest));

if (patchCommands != null)
Expand Down Expand Up @@ -144,8 +163,6 @@ public async Task RemoveArchiveOperation(IDocumentStore store, ArchiveOperation
}
}

static PatchRequest patchRequest = new PatchRequest { Script = @$"this.Status = {(int)FailedMessageStatus.Archived}" };

public class GroupDetails
{
public string GroupName { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,26 @@
using System.Threading.Tasks;
using NServiceBus.Logging;
using Raven.Client.Documents;
using RavenDb5;
using ServiceControl.Infrastructure.DomainEvents;
using ServiceControl.Persistence.Recoverability;
using ServiceControl.Recoverability;

class MessageArchiver : IArchiveMessages
{
public MessageArchiver(IDocumentStore store, OperationsManager operationsManager, IDomainEvents domainEvents)
public MessageArchiver(
IDocumentStore store,
OperationsManager operationsManager,
IDomainEvents domainEvents,
ExpirationManager expirationManager
)
{
this.store = store;
this.domainEvents = domainEvents;
this.expirationManager = expirationManager;
this.operationsManager = operationsManager;

archiveDocumentManager = new ArchiveDocumentManager();
archiveDocumentManager = new ArchiveDocumentManager(expirationManager);
archivingManager = new ArchivingManager(domainEvents, operationsManager);

unarchiveDocumentManager = new UnarchiveDocumentManager();
Expand Down Expand Up @@ -163,7 +170,7 @@ public async Task UnarchiveAllInGroup(string groupId)
logger.Info($"Unarchiving {nextBatch.DocumentIds.Count} messages from group {groupId} starting");
}

unarchiveDocumentManager.UnarchiveMessageGroupBatch(batchSession, nextBatch);
unarchiveDocumentManager.UnarchiveMessageGroupBatch(batchSession, nextBatch, expirationManager);

await unarchivingManager.BatchUnarchived(unarchiveOperation.RequestId, unarchiveOperation.ArchiveType, nextBatch?.DocumentIds.Count ?? 0);

Expand Down Expand Up @@ -232,6 +239,7 @@ public IEnumerable<InMemoryArchive> GetArchivalOperations()
readonly IDocumentStore store;
readonly OperationsManager operationsManager;
readonly IDomainEvents domainEvents;
readonly ExpirationManager expirationManager;
readonly ArchiveDocumentManager archiveDocumentManager;
readonly ArchivingManager archivingManager;
readonly UnarchiveDocumentManager unarchiveDocumentManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Linq;
using System.Threading.Tasks;
using MessageFailures;
using Persistence.RavenDb5;
using Raven.Client.Documents;
using Raven.Client.Documents.Commands.Batches;
using Raven.Client.Documents.Operations;
Expand Down Expand Up @@ -92,8 +93,20 @@ public async Task<GroupDetails> GetGroupDetails(IAsyncDocumentSession session, s
};
}

public void UnarchiveMessageGroupBatch(IAsyncDocumentSession session, UnarchiveBatch batch)
public void UnarchiveMessageGroupBatch(IAsyncDocumentSession session, UnarchiveBatch batch, ExpirationManager expirationManager)
{
// https://ravendb.net/docs/article-page/5.4/Csharp/client-api/operations/patching/single-document#remove-property
var patchRequest = new PatchRequest
{
Script = @"this.Status = args.Status;",
Values =
{
{ "Status", (int)FailedMessageStatus.Unresolved }
}
};

expirationManager.CancelExpiration(patchRequest);

var patchCommands = batch?.DocumentIds.Select(documentId => new PatchCommandData(documentId, null, patchRequest));

if (patchCommands != null)
Expand Down Expand Up @@ -139,8 +152,6 @@ public async Task RemoveUnarchiveOperation(IDocumentStore store, UnarchiveOperat
await session.SaveChangesAsync();
}

static PatchRequest patchRequest = new PatchRequest { Script = $@"this.Status = {(int)FailedMessageStatus.Unresolved}" };

public class GroupDetails
{
public string GroupName { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,26 @@
using Raven.Client.Documents.Commands.Batches;
using Raven.Client.Documents.Operations;
using Raven.Client.Exceptions;
using RavenDb5;
using ServiceControl.Recoverability;

class RetryBatchesDataStore : IRetryBatchesDataStore
{
readonly IDocumentStore documentStore;
readonly ExpirationManager expirationManager;

static readonly ILog Log = LogManager.GetLogger(typeof(RetryBatchesDataStore));

public RetryBatchesDataStore(IDocumentStore documentStore)
public RetryBatchesDataStore(IDocumentStore documentStore, ExpirationManager expirationManager)
{
this.documentStore = documentStore;
this.expirationManager = expirationManager;
}

public Task<IRetryBatchesManager> CreateRetryBatchesManager()
{
var session = documentStore.OpenAsyncSession();
var manager = new RetryBatchesManager(session);
var manager = new RetryBatchesManager(session, expirationManager);

return Task.FromResult<IRetryBatchesManager>(manager);
}
Expand Down
12 changes: 11 additions & 1 deletion src/ServiceControl.Persistence.RavenDb5/RetryBatchesManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
using Persistence.MessageRedirects;
using Raven.Client.Documents;
using Raven.Client.Documents.Session;
using RavenDb5;
using ServiceControl.Recoverability;

class RetryBatchesManager : AbstractSessionManager, IRetryBatchesManager
{
public RetryBatchesManager(IAsyncDocumentSession session) : base(session)
readonly ExpirationManager expirationManager;

public RetryBatchesManager(IAsyncDocumentSession session, ExpirationManager expirationManager) : base(session)
{
this.expirationManager = expirationManager;
}

public void Delete(RetryBatch retryBatch) => Session.Delete(retryBatch);
Expand Down Expand Up @@ -64,5 +68,11 @@ public async Task<MessageRedirectsCollection> GetOrCreateMessageRedirectsCollect

return new MessageRedirectsCollection();
}

public Task CancelExpiration(FailedMessage failedMessage)
{
expirationManager.CancelExpiration(Session, failedMessage);
return Task.CompletedTask;
}
}
}
Loading