diff --git a/src/Persisters.Primary.Includes.props b/src/Persisters.Primary.Includes.props
index 8b2eb0241d..ddc16fb034 100644
--- a/src/Persisters.Primary.Includes.props
+++ b/src/Persisters.Primary.Includes.props
@@ -1,5 +1,6 @@
+
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Persistence/PersistenceHostBuilderExtensions.cs b/src/ServiceControl.Audit/Persistence/PersistenceHostBuilderExtensions.cs
index ca546ea612..a9e468fcac 100644
--- a/src/ServiceControl.Audit/Persistence/PersistenceHostBuilderExtensions.cs
+++ b/src/ServiceControl.Audit/Persistence/PersistenceHostBuilderExtensions.cs
@@ -15,7 +15,8 @@ public static IHostBuilder SetupPersistence(this IHostBuilder hostBuilder,
{
var lifecycle = persistence.Configure(serviceCollection);
- serviceCollection.AddHostedService(_ => new PersistenceLifecycleHostedService(lifecycle));
+ serviceCollection.AddSingleton(new PersistenceLifecycleHostedService(lifecycle));
+ serviceCollection.AddHostedService(sp => sp.GetRequiredService());
});
return hostBuilder;
diff --git a/src/ServiceControl.Persistence.RavenDb/CustomChecks/CheckRavenDBIndexErrors.cs b/src/ServiceControl.Persistence.RavenDb/CustomChecks/CheckRavenDBIndexErrors.cs
index 27f038687d..c076725f57 100644
--- a/src/ServiceControl.Persistence.RavenDb/CustomChecks/CheckRavenDBIndexErrors.cs
+++ b/src/ServiceControl.Persistence.RavenDb/CustomChecks/CheckRavenDBIndexErrors.cs
@@ -41,8 +41,8 @@ public override Task PerformCheck()
return CheckResult.Failed(message);
}
- static ILog Logger = LogManager.GetLogger();
+ static readonly ILog Logger = LogManager.GetLogger();
- IDocumentStore store;
+ readonly IDocumentStore store;
}
}
diff --git a/src/ServiceControl.Persistence.RavenDb/ErrorMessagesDataStore.cs b/src/ServiceControl.Persistence.RavenDb/ErrorMessagesDataStore.cs
index 2de540abd0..261413ed30 100644
--- a/src/ServiceControl.Persistence.RavenDb/ErrorMessagesDataStore.cs
+++ b/src/ServiceControl.Persistence.RavenDb/ErrorMessagesDataStore.cs
@@ -136,28 +136,6 @@ SortInfo sortInfo
}
}
- public async Task>> GetAllMessagesForEndpoint(
- string searchTerms,
- string receivingEndpointName,
- PagingInfo pagingInfo,
- SortInfo sortInfo
- )
- {
- using (var session = documentStore.OpenAsyncSession())
- {
- var results = await session.Query()
- .Statistics(out var stats)
- .Search(x => x.Query, searchTerms)
- .Where(m => m.ReceivingEndpointName == receivingEndpointName)
- .Sort(sortInfo)
- .Paging(pagingInfo)
- .TransformWith()
- .ToListAsync();
-
- return new QueryResult>(results, stats.ToQueryStatsInfo());
- }
- }
-
public async Task FailedMessageFetch(string failedMessageId)
{
using (var session = documentStore.OpenAsyncSession())
diff --git a/src/ServiceControl.Persistence.RavenDb/EventLogDataStore.cs b/src/ServiceControl.Persistence.RavenDb/EventLogDataStore.cs
index 6fbafb64f6..6299ffda67 100644
--- a/src/ServiceControl.Persistence.RavenDb/EventLogDataStore.cs
+++ b/src/ServiceControl.Persistence.RavenDb/EventLogDataStore.cs
@@ -29,7 +29,9 @@ public async Task Add(EventLogItem logItem)
{
using (var session = documentStore.OpenAsyncSession())
{
- var results = await session.Query().Statistics(out var stats)
+ var results = await session
+ .Query()
+ .Statistics(out var stats)
.OrderByDescending(p => p.RaisedAt)
.Paging(pagingInfo)
.ToListAsync();
diff --git a/src/ServiceControl.Persistence.RavenDb/RavenDbCustomCheckDataStore.cs b/src/ServiceControl.Persistence.RavenDb/RavenDbCustomCheckDataStore.cs
index 483e3b3f01..7cb7c02612 100644
--- a/src/ServiceControl.Persistence.RavenDb/RavenDbCustomCheckDataStore.cs
+++ b/src/ServiceControl.Persistence.RavenDb/RavenDbCustomCheckDataStore.cs
@@ -34,7 +34,7 @@ public async Task UpdateCustomCheckStatus(CustomCheckDetail de
{
customCheck = new CustomCheck
{
- Id = id
+ Id = MakeId(id)
};
}
@@ -54,6 +54,11 @@ public async Task UpdateCustomCheckStatus(CustomCheckDetail de
return status;
}
+ static string MakeId(Guid id)
+ {
+ return $"CustomChecks/{id}";
+ }
+
public async Task>> GetStats(PagingInfo paging, string status = null)
{
using (var session = store.OpenAsyncSession())
@@ -73,7 +78,7 @@ public async Task>> GetStats(PagingInfo paging, s
public async Task DeleteCustomCheck(Guid id)
{
- await store.AsyncDatabaseCommands.DeleteAsync(store.Conventions.DefaultFindFullDocumentKeyFromNonStringIdentifier(id, typeof(CustomCheck), false), null);
+ await store.AsyncDatabaseCommands.DeleteAsync(MakeId(id), null);
}
public async Task GetNumberOfFailedChecks()
diff --git a/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs b/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs
index bab32ac5bd..c80cb622a5 100644
--- a/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs
+++ b/src/ServiceControl.Persistence.RavenDb/RavenDbPersistence.cs
@@ -78,10 +78,7 @@ public void Configure(IServiceCollection serviceCollection)
serviceCollection.AddSingleton();
}
- public IPersistenceLifecycle CreateLifecycle()
- {
- return new RavenDbPersistenceLifecycle(ravenStartup, documentStore);
- }
+ public void ConfigureLifecycle(IServiceCollection serviceCollection) => serviceCollection.AddSingleton(new RavenDbPersistenceLifecycle(ravenStartup, documentStore));
public IPersistenceInstaller CreateInstaller()
{
diff --git a/src/ServiceControl.Persistence.RavenDb5/.editorconfig b/src/ServiceControl.Persistence.RavenDb5/.editorconfig
new file mode 100644
index 0000000000..ff993b49bb
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/.editorconfig
@@ -0,0 +1,4 @@
+[*.cs]
+
+# Justification: ServiceControl app has no synchronization context
+dotnet_diagnostic.CA2007.severity = none
diff --git a/src/ServiceControl.Persistence.RavenDb5/Chunker.cs b/src/ServiceControl.Persistence.RavenDb5/Chunker.cs
new file mode 100644
index 0000000000..bae4cb0da8
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/Chunker.cs
@@ -0,0 +1,40 @@
+namespace ServiceControl.Infrastructure.RavenDB
+{
+ using System;
+ using System.Threading;
+
+ static class Chunker
+ {
+ public static int ExecuteInChunks(int total, Func action, T1 t1, T2 t2, CancellationToken cancellationToken = default)
+ {
+ if (total == 0)
+ {
+ return 0;
+ }
+
+ if (total < CHUNK_SIZE)
+ {
+ return action(t1, t2, 0, total - 1);
+ }
+
+ int start = 0, end = CHUNK_SIZE - 1;
+ var chunkCount = 0;
+ do
+ {
+ chunkCount += action(t1, t2, start, end);
+
+ start = end + 1;
+ end += CHUNK_SIZE;
+ if (end >= total)
+ {
+ end = total - 1;
+ }
+ }
+ while (start < total && !cancellationToken.IsCancellationRequested);
+
+ return chunkCount;
+ }
+
+ const int CHUNK_SIZE = 500;
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckFreeDiskSpace.cs b/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckFreeDiskSpace.cs
new file mode 100644
index 0000000000..14fb9ae8d9
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckFreeDiskSpace.cs
@@ -0,0 +1,72 @@
+namespace ServiceControl.Operations
+{
+ using System;
+ using System.IO;
+ using System.Threading.Tasks;
+ using NServiceBus.CustomChecks;
+ using NServiceBus.Logging;
+ using Persistence.RavenDb;
+
+ class CheckFreeDiskSpace : CustomCheck
+ {
+ public CheckFreeDiskSpace(RavenDBPersisterSettings settings) : base("ServiceControl database", "Storage space", TimeSpan.FromMinutes(5))
+ {
+ dataPath = settings.DatabasePath;
+ percentageThreshold = settings.DataSpaceRemainingThreshold;
+
+ Logger.Debug($"Check ServiceControl data drive space remaining custom check starting. Threshold {percentageThreshold:P0}");
+ }
+
+ public override Task PerformCheck()
+ {
+ var dataPathRoot = Path.GetPathRoot(dataPath);
+
+ if (dataPathRoot == null)
+ {
+ throw new Exception($"Unable to find the root of the data path {dataPath}");
+ }
+
+ var dataDriveInfo = new DriveInfo(dataPathRoot);
+ var availableFreeSpace = (decimal)dataDriveInfo.AvailableFreeSpace;
+ var totalSpace = (decimal)dataDriveInfo.TotalSize;
+
+ var percentRemaining = (decimal)dataDriveInfo.AvailableFreeSpace / dataDriveInfo.TotalSize;
+
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.Debug($"Free space: {availableFreeSpace:N0}B | Total: {totalSpace:N0}B | Percent remaining {percentRemaining:P1}");
+ }
+
+ return percentRemaining > percentageThreshold
+ ? CheckResult.Pass
+ : CheckResult.Failed($"{percentRemaining:P0} disk space remaining on data drive '{dataDriveInfo.VolumeLabel} ({dataDriveInfo.RootDirectory})' on '{Environment.MachineName}'.");
+ }
+
+ public static void Validate(RavenDBPersisterSettings settings)
+ {
+ var threshold = settings.DataSpaceRemainingThreshold;
+
+ string message;
+
+ if (threshold < 0)
+ {
+ message = $"{RavenDbPersistenceConfiguration.DataSpaceRemainingThresholdKey} is invalid, minimum value is 0.";
+ Logger.Fatal(message);
+ throw new Exception(message);
+ }
+
+ if (threshold > 100)
+ {
+ message = $"{RavenDbPersistenceConfiguration.DataSpaceRemainingThresholdKey} is invalid, maximum value is 100.";
+ Logger.Fatal(message);
+ throw new Exception(message);
+ }
+ }
+
+ readonly string dataPath;
+ readonly decimal percentageThreshold;
+
+ public const int DataSpaceRemainingThresholdDefault = 20;
+ static readonly ILog Logger = LogManager.GetLogger(typeof(CheckFreeDiskSpace));
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckMinimumStorageRequiredForIngestion.cs b/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckMinimumStorageRequiredForIngestion.cs
new file mode 100644
index 0000000000..a696a1d45d
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckMinimumStorageRequiredForIngestion.cs
@@ -0,0 +1,90 @@
+namespace ServiceControl.Operations
+{
+ using System;
+ using System.IO;
+ using System.Threading.Tasks;
+ using NServiceBus.CustomChecks;
+ using NServiceBus.Logging;
+ using Persistence.RavenDb;
+ using ServiceControl.Persistence;
+
+ class CheckMinimumStorageRequiredForIngestion : CustomCheck
+ {
+ public CheckMinimumStorageRequiredForIngestion(
+ MinimumRequiredStorageState stateHolder,
+ RavenDBPersisterSettings settings)
+ : base("Message Ingestion Process", "ServiceControl Health", TimeSpan.FromSeconds(5))
+ {
+ this.stateHolder = stateHolder;
+ this.settings = settings;
+
+ dataPathRoot = Path.GetPathRoot(settings.DatabasePath);
+ }
+
+ public override Task PerformCheck()
+ {
+ percentageThreshold = settings.MinimumStorageLeftRequiredForIngestion / 100m;
+
+ if (dataPathRoot == null)
+ {
+ stateHolder.CanIngestMore = true;
+ return SuccessResult;
+ }
+
+ Logger.Debug($"Check ServiceControl data drive space starting. Threshold {percentageThreshold:P0}");
+
+ var dataDriveInfo = new DriveInfo(dataPathRoot);
+ var availableFreeSpace = (decimal)dataDriveInfo.AvailableFreeSpace;
+ var totalSpace = (decimal)dataDriveInfo.TotalSize;
+
+ var percentRemaining = (decimal)dataDriveInfo.AvailableFreeSpace / dataDriveInfo.TotalSize;
+
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.Debug($"Free space: {availableFreeSpace:N0}B | Total: {totalSpace:N0}B | Percent remaining {percentRemaining:P1}");
+ }
+
+ if (percentRemaining > percentageThreshold)
+ {
+ stateHolder.CanIngestMore = true;
+ return SuccessResult;
+ }
+
+ var message = $"Error message ingestion stopped! {percentRemaining:P0} disk space remaining on data drive '{dataDriveInfo.VolumeLabel} ({dataDriveInfo.RootDirectory})' on '{Environment.MachineName}'. This is less than {percentageThreshold}% - the minimal required space configured. The threshold can be set using the {RavenBootstrapper.MinimumStorageLeftRequiredForIngestionKey} configuration setting.";
+ Logger.Warn(message);
+ stateHolder.CanIngestMore = false;
+ return CheckResult.Failed(message);
+ }
+
+ public static void Validate(RavenDBPersisterSettings settings)
+ {
+ int threshold = settings.MinimumStorageLeftRequiredForIngestion;
+
+ string message;
+ if (threshold < 0)
+ {
+ message = $"{RavenBootstrapper.MinimumStorageLeftRequiredForIngestionKey} is invalid, minimum value is 0.";
+ Logger.Fatal(message);
+ throw new Exception(message);
+ }
+
+ if (threshold > 100)
+ {
+ message = $"{RavenBootstrapper.MinimumStorageLeftRequiredForIngestionKey} is invalid, maximum value is 100.";
+ Logger.Fatal(message);
+ throw new Exception(message);
+ }
+ }
+
+ public const int MinimumStorageLeftRequiredForIngestionDefault = 5;
+
+ readonly MinimumRequiredStorageState stateHolder;
+ readonly RavenDBPersisterSettings settings;
+ readonly string dataPathRoot;
+
+ decimal percentageThreshold;
+
+ static readonly Task SuccessResult = Task.FromResult(CheckResult.Pass);
+ static readonly ILog Logger = LogManager.GetLogger(typeof(CheckMinimumStorageRequiredForIngestion));
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckRavenDBIndexErrors.cs b/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckRavenDBIndexErrors.cs
new file mode 100644
index 0000000000..83156e61d8
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckRavenDBIndexErrors.cs
@@ -0,0 +1,50 @@
+namespace ServiceControl
+{
+ using System;
+ using System.Text;
+ using System.Threading.Tasks;
+ using NServiceBus.CustomChecks;
+ using NServiceBus.Logging;
+ using Raven.Client.Documents;
+ using Raven.Client.Documents.Operations.Indexes;
+
+ class CheckRavenDBIndexErrors : CustomCheck
+ {
+ public CheckRavenDBIndexErrors(IDocumentStore store)
+ : base("Error Database Index Errors", "ServiceControl Health", TimeSpan.FromMinutes(5))
+ {
+ this.store = store;
+ }
+
+ public override Task PerformCheck()
+ {
+ var indexErrors = store.Maintenance.Send(new GetIndexErrorsOperation());
+
+ if (indexErrors.Length == 0)
+ {
+ return CheckResult.Pass;
+ }
+
+ var text = new StringBuilder();
+ text.AppendLine("Detected RavenDB index errors, please start maintenance mode and resolve the following issues:");
+
+ foreach (var indexError in indexErrors)
+ {
+ foreach (var indexingError in indexError.Errors)
+ {
+ text.AppendLine($"- Index [{indexError.Name}] error: {indexError.Name} (Action: {indexingError.Action}, Doc: {indexingError.Document}, At: {indexingError.Timestamp})");
+ }
+ }
+
+ text.AppendLine().AppendLine("See: https://docs.particular.net/search?q=servicecontrol+troubleshooting");
+
+ var message = text.ToString();
+ Logger.Error(message);
+ return CheckResult.Failed(message);
+ }
+
+ static readonly ILog Logger = LogManager.GetLogger();
+
+ readonly IDocumentStore store;
+ }
+}
diff --git a/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckRavenDBIndexLag.cs b/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckRavenDBIndexLag.cs
new file mode 100644
index 0000000000..a6bc6acc4b
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/CustomChecks/CheckRavenDBIndexLag.cs
@@ -0,0 +1,91 @@
+namespace ServiceControl
+{
+ using System;
+ using System.Linq;
+ using System.Text;
+ using System.Threading.Tasks;
+ using NServiceBus.CustomChecks;
+ using NServiceBus.Logging;
+ using Raven.Client.Documents;
+ using Raven.Client.Documents.Operations;
+ using CustomCheck = NServiceBus.CustomChecks.CustomCheck;
+
+ class CheckRavenDBIndexLag : CustomCheck
+ {
+ public CheckRavenDBIndexLag(IDocumentStore store)
+ : base("Error Database Index Lag", "ServiceControl Health", TimeSpan.FromMinutes(5))
+ {
+ this.store = store;
+ }
+
+ public override async Task PerformCheck()
+ {
+ var statistics = await store.Maintenance.SendAsync(new GetStatisticsOperation());
+ var indexes = statistics.Indexes.OrderBy(x => x.Name).ToArray();
+
+ CreateDiagnosticsLogEntry(statistics, indexes);
+
+ var indexCountWithTooMuchLag = CheckAndReportIndexesWithTooMuchIndexLag(indexes);
+
+ if (indexCountWithTooMuchLag > 0)
+ {
+ return CheckResult.Failed($"At least one index significantly stale. Please run maintenance mode if this custom check persists to ensure index(es) can recover. See log file for more details. Visit https://docs.particular.net/search?q=servicecontrol+troubleshooting for more information.");
+ }
+
+ return CheckResult.Pass;
+ }
+
+ static int CheckAndReportIndexesWithTooMuchIndexLag(IndexInformation[] indexes)
+ {
+ int indexCountWithTooMuchLag = 0;
+
+ foreach (var indexStats in indexes)
+ {
+ if (indexStats.LastIndexingTime.HasValue)
+ {
+ var indexLag = DateTime.UtcNow - indexStats.LastIndexingTime.Value; // TODO: Ensure audit ravendb5 persistence uses the same index lag behavior based on time
+
+ if (indexLag > IndexLagThresholdError)
+ {
+ indexCountWithTooMuchLag++;
+ Log.Error($"Index [{indexStats.Name}] IndexingLag {indexLag} is above error threshold ({IndexLagThresholdError}). Launch in maintenance mode to let indexes catch up.");
+ }
+ else if (indexLag > IndexLagThresholdWarning)
+ {
+ indexCountWithTooMuchLag++;
+ Log.Warn($"Index [{indexStats.Name}] IndexingLag {indexLag} is above warning threshold ({IndexLagThresholdWarning}). Launch in maintenance mode to let indexes catch up.");
+ }
+ }
+ }
+
+ return indexCountWithTooMuchLag;
+ }
+
+ static void CreateDiagnosticsLogEntry(DatabaseStatistics statistics, IndexInformation[] indexes)
+ {
+ if (!Log.IsDebugEnabled)
+ {
+ return;
+ }
+
+ var report = new StringBuilder();
+ report.AppendLine("Internal RavenDB index health report:");
+ report.AppendLine($"- DB Size: {statistics.SizeOnDisk.HumaneSize}");
+ report.AppendLine($"- LastIndexingTime {statistics.LastIndexingTime:u}");
+
+ foreach (var indexStats in indexes)
+ {
+ report.AppendLine($"- Index [{indexStats.Name,-44}] State: {indexStats.State}, Stale: {indexStats.IsStale,-5}, Priority: {indexStats.Priority,-6}, LastIndexingTime: {indexStats.LastIndexingTime:u}");
+ }
+ Log.Debug(report.ToString());
+ }
+
+ // TODO: RavenDB 3.5 had IndexLag thresholds that were number of document writes, and I converted to times. Revisit these numbers before shipping
+ // For IndexLag as document writes, 10k was a warning, 100k was an error. These TimeSpans assume same # of writes / 250 writes/sec
+ static readonly TimeSpan IndexLagThresholdWarning = TimeSpan.FromSeconds(40); // Assuming 10_000 writes at 250 writes/sec
+ static readonly TimeSpan IndexLagThresholdError = TimeSpan.FromSeconds(400); // Assuming 100_000 writes at 250 writes/sec
+ static readonly ILog Log = LogManager.GetLogger();
+
+ readonly IDocumentStore store;
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/DatabaseSetup.cs b/src/ServiceControl.Persistence.RavenDb5/DatabaseSetup.cs
new file mode 100644
index 0000000000..0b53d5782c
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/DatabaseSetup.cs
@@ -0,0 +1,88 @@
+namespace ServiceControl.Persistence.RavenDb5
+{
+ using System.Collections.Generic;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Raven.Client.Documents;
+ using Raven.Client.Documents.Indexes;
+ using Raven.Client.Documents.Operations;
+ using Raven.Client.Documents.Operations.Expiration;
+ using Raven.Client.Exceptions;
+ using Raven.Client.Exceptions.Database;
+ using Raven.Client.ServerWide;
+ using Raven.Client.ServerWide.Operations;
+ using ServiceControl.MessageFailures.Api;
+ using ServiceControl.Operations;
+ using ServiceControl.Persistence;
+ using ServiceControl.Recoverability;
+
+ class DatabaseSetup
+ {
+ public DatabaseSetup(RavenDBPersisterSettings settings)
+ {
+ this.settings = settings;
+ }
+
+ public async Task Execute(IDocumentStore documentStore, CancellationToken cancellationToken)
+ {
+ try
+ {
+ await documentStore.Maintenance.ForDatabase(settings.DatabaseName).SendAsync(new GetStatisticsOperation(), cancellationToken);
+ }
+ catch (DatabaseDoesNotExistException)
+ {
+ try
+ {
+ await documentStore.Maintenance.Server
+ .SendAsync(new CreateDatabaseOperation(new DatabaseRecord(settings.DatabaseName)), cancellationToken);
+ }
+ catch (ConcurrencyException)
+ {
+ // The database was already created before calling CreateDatabaseOperation
+ }
+ }
+
+ var indexList = new List {
+ new ArchivedGroupsViewIndex(),
+ new CustomChecksIndex(),
+ new FailedErrorImportIndex(),
+ new FailedMessageFacetsIndex(),
+ new FailedMessageRetries_ByBatch(),
+ new FailedMessageViewIndex(),
+ new FailureGroupsViewIndex(),
+ new GroupCommentIndex(),
+ new KnownEndpointIndex(),
+ new MessagesViewIndex(),
+ new QueueAddressIndex(),
+ new RetryBatches_ByStatusAndSession(),
+ new RetryBatches_ByStatus_ReduceInitialBatchSize()
+
+ };
+
+ //TODO: Handle full text search
+ //if (settings.EnableFullTextSearch)
+ //{
+ // indexList.Add(new MessagesViewIndexWithFullTextSearch());
+ // await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("MessagesViewIndex"), cancellationToken);
+ //}
+ //else
+ //{
+ // indexList.Add(new MessagesViewIndex());
+ // await documentStore.Maintenance
+ // .SendAsync(new DeleteIndexOperation("MessagesViewIndexWithFullTextSearch"), cancellationToken);
+ //}
+
+ await IndexCreation.CreateIndexesAsync(indexList, documentStore, null, null, cancellationToken);
+
+ var expirationConfig = new ExpirationConfiguration
+ {
+ Disabled = false,
+ DeleteFrequencyInSec = settings.ExpirationProcessTimerInSeconds
+ };
+
+ await documentStore.Maintenance.SendAsync(new ConfigureExpirationOperation(expirationConfig), cancellationToken);
+ }
+
+ readonly RavenDBPersisterSettings settings;
+ }
+}
diff --git a/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/FailedMessageIdGenerator.cs b/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/FailedMessageIdGenerator.cs
new file mode 100644
index 0000000000..4935378ec9
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/FailedMessageIdGenerator.cs
@@ -0,0 +1,11 @@
+static class FailedMessageIdGenerator
+{
+ public const string CollectionName = "FailedMessages";
+
+ public static string MakeDocumentId(string messageUniqueId)
+ {
+ return $"{CollectionName}/{messageUniqueId}";
+ }
+
+ public static string GetMessageIdFromDocumentId(string failedMessageDocumentId) => failedMessageDocumentId.Substring(CollectionName.Length + 1);
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/KnownEndpointIdGenerator.cs b/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/KnownEndpointIdGenerator.cs
new file mode 100644
index 0000000000..ea368580af
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/KnownEndpointIdGenerator.cs
@@ -0,0 +1,7 @@
+using System;
+
+static class KnownEndpointIdGenerator
+{
+ const string CollectionName = "KnownEndpoint";
+ public static string MakeDocumentId(Guid endpointId) => $"{CollectionName}/{endpointId}";
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs b/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs
new file mode 100644
index 0000000000..9891ffdc14
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/DocumentIdGenerators/MessageBodyIdGenerator.cs
@@ -0,0 +1,9 @@
+static class MessageBodyIdGenerator
+{
+ const string CollectionName = "messagebodies";
+
+ public static string MakeDocumentId(string messageUniqueId)
+ {
+ return $"{CollectionName}/{messageUniqueId}";
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/Editing/EditFailedMessageManager.cs b/src/ServiceControl.Persistence.RavenDb5/Editing/EditFailedMessageManager.cs
new file mode 100644
index 0000000000..1f87798ea0
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/Editing/EditFailedMessageManager.cs
@@ -0,0 +1,53 @@
+namespace ServiceControl.Persistence.RavenDb
+{
+ using System;
+ using System.Threading.Tasks;
+ using ServiceControl.MessageFailures;
+ using ServiceControl.Persistence.Recoverability.Editing;
+ using Raven.Client.Documents.Session;
+
+ class EditFailedMessageManager : AbstractSessionManager, IEditFailedMessagesManager
+ {
+ readonly IAsyncDocumentSession session;
+ FailedMessage failedMessage;
+
+ public EditFailedMessageManager(IAsyncDocumentSession session)
+ : base(session)
+ {
+ this.session = session;
+ }
+
+ public async Task GetFailedMessage(string failedMessageId)
+ {
+ failedMessage = await session.LoadAsync(FailedMessageIdGenerator.MakeDocumentId(failedMessageId));
+ return failedMessage;
+ }
+
+ public async Task GetCurrentEditingMessageId(string failedMessageId)
+ {
+ var edit = await session.LoadAsync(FailedMessageEdit.MakeDocumentId(failedMessageId));
+ return edit?.EditId;
+ }
+
+ public Task SetCurrentEditingMessageId(string editingMessageId)
+ {
+ if (failedMessage == null)
+ {
+ throw new InvalidOperationException("No failed message loaded");
+ }
+ return session.StoreAsync(new FailedMessageEdit
+ {
+ Id = FailedMessageEdit.MakeDocumentId(failedMessage.UniqueMessageId),
+ FailedMessageId = failedMessage.Id,
+ EditId = editingMessageId
+ });
+ }
+
+ public Task SetFailedMessageAsResolved()
+ {
+ // Instance is tracked by the document session
+ failedMessage.Status = FailedMessageStatus.Resolved;
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/src/ServiceControl.Persistence.RavenDb5/Editing/FailedMessageEdit.cs b/src/ServiceControl.Persistence.RavenDb5/Editing/FailedMessageEdit.cs
new file mode 100644
index 0000000000..2bc0e7a1e0
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/Editing/FailedMessageEdit.cs
@@ -0,0 +1,16 @@
+namespace ServiceControl.Persistence.Recoverability.Editing
+{
+ class FailedMessageEdit
+ {
+ public string Id { get; set; }
+ public string FailedMessageId { get; set; }
+ public string EditId { get; set; }
+
+ public static string MakeDocumentId(string failedMessageId)
+ {
+ return $"{CollectionName}/{failedMessageId}";
+ }
+
+ const string CollectionName = "FailedMessageEdit";
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/Editing/NotificationsManager.cs b/src/ServiceControl.Persistence.RavenDb5/Editing/NotificationsManager.cs
new file mode 100644
index 0000000000..8857860ad9
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/Editing/NotificationsManager.cs
@@ -0,0 +1,38 @@
+namespace ServiceControl.Persistence.RavenDb.Editing
+{
+ using System;
+ using System.Threading.Tasks;
+ using Notifications;
+ using Raven.Client.Documents.Session;
+
+ class NotificationsManager : AbstractSessionManager, INotificationsManager
+ {
+ static readonly TimeSpan CacheTimeoutDefault = TimeSpan.FromMinutes(5); // Raven requires this to be at least 1 second
+
+ public NotificationsManager(IAsyncDocumentSession session) : base(session)
+ {
+ }
+
+ public async Task LoadSettings(TimeSpan? cacheTimeout = null)
+ {
+
+ using (Session.Advanced.DocumentStore.AggressivelyCacheFor(cacheTimeout ?? CacheTimeoutDefault))
+ {
+ var settings = await Session
+ .LoadAsync(NotificationsSettings.SingleDocumentId);
+
+ if (settings == null)
+ {
+ settings = new NotificationsSettings
+ {
+ Id = NotificationsSettings.SingleDocumentId
+ };
+
+ await Session.StoreAsync(settings);
+ }
+
+ return settings;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/EmbeddedDatabase.cs b/src/ServiceControl.Persistence.RavenDb5/EmbeddedDatabase.cs
new file mode 100644
index 0000000000..536d5247fc
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/EmbeddedDatabase.cs
@@ -0,0 +1,235 @@
+namespace ServiceControl.Persistence.RavenDb5
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Globalization;
+ using System.IO;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using ByteSizeLib;
+ using NServiceBus.Logging;
+ using Raven.Client.Documents;
+ using Raven.Client.Documents.Conventions;
+ using Raven.Embedded;
+
+ public class EmbeddedDatabase : IDisposable
+ {
+ EmbeddedDatabase(RavenDBPersisterSettings configuration)
+ {
+ this.configuration = configuration;
+ ServerUrl = configuration.ServerUrl;
+ }
+
+ public string ServerUrl { get; private set; }
+
+ static (string LicenseFileName, string ServerDirectory) GetRavenLicenseFileNameAndServerDirectory()
+ {
+ var licenseFileName = "RavenLicense.json";
+ var localRavenLicense = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, licenseFileName);
+ if (File.Exists(localRavenLicense))
+ {
+ return (localRavenLicense, null);
+ }
+
+ //TODO: refactor this to extract the folder name to a constant
+ localRavenLicense = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Persisters", "RavenDB5", licenseFileName);
+ if (!File.Exists(localRavenLicense))
+ {
+ throw new Exception($"RavenDB license not found. Make sure the RavenDB license file, '{licenseFileName}', " +
+ $"is stored in the '{AppDomain.CurrentDomain.BaseDirectory}' folder or in the 'Persisters/RavenDB5' subfolder.");
+ }
+
+ // By default RavenDB 5 searches its binaries in the RavenDBServer right below the BaseDirectory.
+ // If we're loading from Persisters/RavenDB5 we also have to signal RavenDB where are binaries
+ var serverDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Persisters", "RavenDB5", "RavenDBServer");
+
+ return (localRavenLicense, serverDirectory);
+ }
+
+ internal static EmbeddedDatabase Start(RavenDBPersisterSettings settings)
+ {
+ var licenseFileNameAndServerDirectory = GetRavenLicenseFileNameAndServerDirectory();
+
+ var nugetPackagesPath = Path.Combine(settings.DatabasePath, "Packages", "NuGet");
+
+ logger.InfoFormat("Loading RavenDB license from {0}", licenseFileNameAndServerDirectory.LicenseFileName);
+ var serverOptions = new ServerOptions
+ {
+ CommandLineArgs = new List
+ {
+ $"--License.Path=\"{licenseFileNameAndServerDirectory.LicenseFileName}\"",
+ $"--Logs.Mode={settings.LogsMode}",
+ // HINT: If this is not set, then Raven will pick a default location relative to the server binaries
+ // See https://github.com/ravendb/ravendb/issues/15694
+ $"--Indexing.NuGetPackagesPath=\"{nugetPackagesPath}\""
+ },
+ AcceptEula = true,
+ DataDirectory = settings.DatabasePath,
+ ServerUrl = settings.ServerUrl,
+ LogsPath = settings.LogPath
+ };
+
+ if (!string.IsNullOrWhiteSpace(licenseFileNameAndServerDirectory.ServerDirectory))
+ {
+ serverOptions.ServerDirectory = licenseFileNameAndServerDirectory.ServerDirectory;
+ }
+
+ var embeddedDatabase = new EmbeddedDatabase(settings);
+
+ embeddedDatabase.Start(serverOptions);
+
+ RecordStartup(settings);
+
+ return embeddedDatabase;
+ }
+
+ void Start(ServerOptions serverOptions)
+ {
+ EmbeddedServer.Instance.ServerProcessExited += (sender, args) =>
+ {
+ if (sender is Process process && process.HasExited && process.ExitCode != 0)
+ {
+ logger.Warn($"RavenDB server process exited unexpectedly with exitCode: {process.ExitCode}. Process will be restarted.");
+
+ restartRequired = true;
+ }
+ };
+
+ EmbeddedServer.Instance.StartServer(serverOptions);
+
+ var _ = Task.Run(async () =>
+ {
+ while (!shutdownTokenSource.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.Delay(delayBetweenRestarts, shutdownTokenSource.Token);
+
+ if (restartRequired)
+ {
+ logger.Info("Restarting RavenDB server process");
+
+ await EmbeddedServer.Instance.RestartServerAsync();
+ restartRequired = false;
+
+ logger.Info("RavenDB server process restarted successfully.");
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //no-op
+ }
+ catch (Exception e)
+ {
+ logger.Fatal($"RavenDB server restart failed. Restart will be retried in {delayBetweenRestarts}.", e);
+ }
+ }
+ });
+ }
+
+ public async Task Connect(CancellationToken cancellationToken)
+ {
+ var dbOptions = new DatabaseOptions(configuration.DatabaseName)
+ {
+ Conventions = new DocumentConventions
+ {
+ SaveEnumsAsIntegers = true
+ }
+ };
+
+ //TODO: copied from Audit. In Audit FindClrType so I guess this is not needed. Confirm and remove
+ //if (configuration.FindClrType != null)
+ //{
+ // dbOptions.Conventions.FindClrType += configuration.FindClrType;
+ //}
+
+ var store = await EmbeddedServer.Instance.GetDocumentStoreAsync(dbOptions, cancellationToken);
+
+ var databaseSetup = new DatabaseSetup(configuration);
+ await databaseSetup.Execute(store, cancellationToken);
+
+ return store;
+ }
+
+ public void Dispose()
+ {
+ shutdownTokenSource.Cancel();
+ EmbeddedServer.Instance?.Dispose();
+ }
+
+ static void RecordStartup(RavenDBPersisterSettings settings)
+ {
+ var dataSize = DataSize(settings);
+ var folderSize = FolderSize(settings);
+
+ var startupMessage = $@"
+-------------------------------------------------------------
+Database Size: {ByteSize.FromBytes(dataSize).ToString("#.##", CultureInfo.InvariantCulture)}
+Database Folder Size: {ByteSize.FromBytes(folderSize).ToString("#.##", CultureInfo.InvariantCulture)}
+-------------------------------------------------------------";
+
+ logger.Info(startupMessage);
+ }
+
+ static long DataSize(RavenDBPersisterSettings settings)
+ {
+ var datafilePath = Path.Combine(settings.DatabasePath, "data");
+
+ try
+ {
+ var info = new FileInfo(datafilePath);
+ if (!info.Exists)
+ {
+ return -1;
+ }
+ return info.Length;
+ }
+ catch
+ {
+ return -1;
+ }
+ }
+
+ static long FolderSize(RavenDBPersisterSettings settings)
+ {
+ try
+ {
+ var dir = new DirectoryInfo(settings.DatabasePath);
+ var dirSize = DirSize(dir);
+ return dirSize;
+ }
+ catch
+ {
+ return -1;
+ }
+ }
+
+ static long DirSize(DirectoryInfo d)
+ {
+ long size = 0;
+ if (d.Exists)
+ {
+ FileInfo[] fis = d.GetFiles();
+ foreach (FileInfo fi in fis)
+ {
+ size += fi.Length;
+ }
+
+ DirectoryInfo[] dis = d.GetDirectories();
+ foreach (DirectoryInfo di in dis)
+ {
+ size += DirSize(di);
+ }
+ }
+
+ return size;
+ }
+ CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
+ bool restartRequired;
+ readonly RavenDBPersisterSettings configuration;
+
+ static TimeSpan delayBetweenRestarts = TimeSpan.FromSeconds(60);
+ static readonly ILog logger = LogManager.GetLogger();
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/EndpointDetailsParser.cs b/src/ServiceControl.Persistence.RavenDb5/EndpointDetailsParser.cs
new file mode 100644
index 0000000000..1cda01af30
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/EndpointDetailsParser.cs
@@ -0,0 +1,120 @@
+namespace ServiceControl
+{
+ using System;
+ using System.Collections.Generic;
+ using Infrastructure;
+ using NServiceBus;
+ using ServiceControl.Operations;
+
+ class EndpointDetailsParser
+ {
+ public static EndpointDetails SendingEndpoint(IReadOnlyDictionary headers)
+ {
+ var endpointDetails = new EndpointDetails();
+
+ DictionaryExtensions.CheckIfKeyExists(Headers.OriginatingEndpoint, headers, s => endpointDetails.Name = s);
+ DictionaryExtensions.CheckIfKeyExists("NServiceBus.OriginatingMachine", headers, s => endpointDetails.Host = s);
+ DictionaryExtensions.CheckIfKeyExists(Headers.OriginatingHostId, headers, s => endpointDetails.HostId = Guid.Parse(s));
+
+ if (!string.IsNullOrEmpty(endpointDetails.Name) && !string.IsNullOrEmpty(endpointDetails.Host))
+ {
+ return endpointDetails;
+ }
+
+ string address = null;
+ DictionaryExtensions.CheckIfKeyExists(Headers.OriginatingAddress, headers, s => address = s);
+
+ if (address != null)
+ {
+ var queueAndMachinename = ExtractQueueAndMachineName(address);
+ endpointDetails.Name = queueAndMachinename.Queue;
+ endpointDetails.Host = queueAndMachinename.Machine;
+ return endpointDetails;
+ }
+
+ return null;
+ }
+
+ public static EndpointDetails ReceivingEndpoint(IReadOnlyDictionary headers)
+ {
+ var endpoint = new EndpointDetails();
+
+ if (headers.TryGetValue(Headers.HostId, out var hostIdHeader))
+ {
+ endpoint.HostId = Guid.Parse(hostIdHeader);
+ }
+
+ if (headers.TryGetValue(Headers.HostDisplayName, out var hostDisplayNameHeader))
+ {
+ endpoint.Host = hostDisplayNameHeader;
+ }
+ else
+ {
+ DictionaryExtensions.CheckIfKeyExists("NServiceBus.ProcessingMachine", headers, s => endpoint.Host = s);
+ }
+
+ DictionaryExtensions.CheckIfKeyExists(Headers.ProcessingEndpoint, headers, s => endpoint.Name = s);
+
+ if (!string.IsNullOrEmpty(endpoint.Name) && !string.IsNullOrEmpty(endpoint.Host))
+ {
+ return endpoint;
+ }
+
+ string address = null;
+ //use the failed q to determine the receiving endpoint
+ DictionaryExtensions.CheckIfKeyExists("NServiceBus.FailedQ", headers, s => address = s);
+
+ // If we have a failed queue, then construct an endpoint from the failed queue information
+ if (address != null)
+ {
+ var queueAndMachinename = ExtractQueueAndMachineName(address);
+
+ if (string.IsNullOrEmpty(endpoint.Name))
+ {
+ endpoint.Name = queueAndMachinename.Queue;
+ }
+
+ if (string.IsNullOrEmpty(endpoint.Host))
+ {
+ endpoint.Host = queueAndMachinename.Machine;
+ }
+
+ // If we've been now able to get the endpoint details, return the new info.
+ if (!string.IsNullOrEmpty(endpoint.Name) && !string.IsNullOrEmpty(endpoint.Host))
+ {
+ return endpoint;
+ }
+ }
+
+ return null;
+ }
+
+ static QueueAndMachine ExtractQueueAndMachineName(string address)
+ {
+ var atIndex = address?.IndexOf("@", StringComparison.InvariantCulture);
+
+ if (atIndex.HasValue && atIndex.Value > -1)
+ {
+ var queue = address.Substring(0, atIndex.Value);
+ var machine = address.Substring(atIndex.Value + 1);
+ return new QueueAndMachine
+ {
+ Queue = queue,
+ Machine = machine
+ };
+ }
+
+ return new QueueAndMachine
+ {
+ Queue = address,
+ Machine = null
+ };
+ }
+
+ struct QueueAndMachine
+ {
+ public string Queue;
+ public string Machine;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs
new file mode 100644
index 0000000000..19db4f2f6b
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/ErrorMessagesDataStore.cs
@@ -0,0 +1,748 @@
+namespace ServiceControl.Persistence.RavenDb
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading.Tasks;
+ using Editing;
+ using NServiceBus.Logging;
+ using Raven.Client.Documents;
+ using Raven.Client.Documents.Commands;
+ using Raven.Client.Documents.Linq;
+ using Raven.Client.Documents.Operations;
+ using Raven.Client.Documents.Queries;
+ using Raven.Client.Documents.Queries.Facets;
+ using Raven.Client.Documents.Session;
+ using ServiceControl.CompositeViews.Messages;
+ using ServiceControl.EventLog;
+ using ServiceControl.MessageFailures;
+ using ServiceControl.MessageFailures.Api;
+ using ServiceControl.Operations;
+ using ServiceControl.Persistence.Infrastructure;
+ using ServiceControl.Recoverability;
+
+ class ErrorMessagesDataStore : IErrorMessageDataStore
+ {
+ readonly IDocumentStore documentStore;
+
+ public ErrorMessagesDataStore(IDocumentStore documentStore)
+ {
+ this.documentStore = documentStore;
+
+ }
+
+ public async Task>> GetAllMessages(
+ PagingInfo pagingInfo,
+ SortInfo sortInfo,
+ bool includeSystemMessages
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.Query()
+ .IncludeSystemMessagesWhere(includeSystemMessages)
+ .Statistics(out var stats)
+ .Sort(sortInfo)
+ .Paging(pagingInfo)
+ //.TransformWith()
+ .TransformToMessagesView()
+ .ToListAsync();
+
+ return new QueryResult>(results, stats.ToQueryStatsInfo());
+ }
+ }
+
+ public async Task>> GetAllMessagesForEndpoint(
+ string endpointName,
+ PagingInfo pagingInfo,
+ SortInfo sortInfo,
+ bool includeSystemMessages
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.Query()
+ .IncludeSystemMessagesWhere(includeSystemMessages)
+ .Where(m => m.ReceivingEndpointName == endpointName)
+ .Statistics(out var stats)
+ .Sort(sortInfo)
+ .Paging(pagingInfo)
+ //.TransformWith()
+ .TransformToMessagesView()
+ .ToListAsync();
+
+ return new QueryResult>(results, stats.ToQueryStatsInfo());
+ }
+ }
+
+ public async Task>> SearchEndpointMessages(
+ string endpointName,
+ string searchKeyword,
+ PagingInfo pagingInfo,
+ SortInfo sortInfo
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.Query()
+ .Statistics(out var stats)
+ .Search(x => x.Query, searchKeyword)
+ .Where(m => m.ReceivingEndpointName == endpointName)
+ .Sort(sortInfo)
+ .Paging(pagingInfo)
+ //.TransformWith()
+ .TransformToMessagesView()
+ .ToListAsync();
+
+ return new QueryResult>(results, stats.ToQueryStatsInfo());
+ }
+ }
+
+ public async Task>> GetAllMessagesByConversation(
+ string conversationId,
+ PagingInfo pagingInfo,
+ SortInfo sortInfo,
+ bool includeSystemMessages
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.Query()
+ .Statistics(out var stats)
+ .Where(m => m.ConversationId == conversationId)
+ .Sort(sortInfo)
+ .Paging(pagingInfo)
+ //.TransformWith()
+ .TransformToMessagesView()
+ .ToListAsync();
+
+ return new QueryResult>(results, stats.ToQueryStatsInfo());
+ }
+ }
+
+ public async Task>> GetAllMessagesForSearch(
+ string searchTerms,
+ PagingInfo pagingInfo,
+ SortInfo sortInfo
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.Query()
+ .Statistics(out var stats)
+ .Search(x => x.Query, searchTerms)
+ .Sort(sortInfo)
+ .Paging(pagingInfo)
+ //.TransformWith()
+ .TransformToMessagesView()
+ .ToListAsync();
+
+ return new QueryResult>(results, stats.ToQueryStatsInfo());
+ }
+ }
+
+ public async Task FailedMessageFetch(string failedMessageId)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ return await session.LoadAsync(failedMessageId);
+ }
+ }
+
+ public async Task FailedMessageMarkAsArchived(string failedMessageId)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var failedMessage = await session.LoadAsync(failedMessageId);
+
+ if (failedMessage.Status != FailedMessageStatus.Archived)
+ {
+ failedMessage.Status = FailedMessageStatus.Archived;
+ }
+
+ await session.SaveChangesAsync();
+ }
+ }
+
+ public async Task FailedMessagesFetch(Guid[] ids)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.LoadAsync(ids.Select(g => g.ToString()));
+ return results.Values.Where(x => x != null).ToArray();
+ }
+ }
+
+ public async Task StoreFailedErrorImport(FailedErrorImport failure)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ await session.StoreAsync(failure);
+
+ await session.SaveChangesAsync();
+ }
+ }
+
+ public Task CreateEditFailedMessageManager()
+ {
+ var session = documentStore.OpenAsyncSession();
+ var manager = new EditFailedMessageManager(session);
+ return Task.FromResult((IEditFailedMessagesManager)manager);
+ }
+
+ public async Task> GetFailureGroupView(string groupId, string status, string modified)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var document = await session.Advanced
+ .AsyncDocumentQuery()
+ .Statistics(out var stats)
+ .WhereEquals(group => group.Id, groupId)
+ .FilterByStatusWhere(status)
+ .FilterByLastModifiedRange(modified)
+ .FirstOrDefaultAsync();
+
+ return new QueryResult(document, stats.ToQueryStatsInfo());
+ }
+ }
+
+ public async Task> GetFailureGroupsByClassifier(string classifier)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var groups = session
+ .Query()
+ .Where(v => v.Type == classifier);
+
+ var results = await groups
+ .OrderByDescending(x => x.Last)
+ .Take(200) // only show 200 groups
+ .ToListAsync();
+
+ return results;
+ }
+ }
+
+ public async Task>> ErrorGet(
+ string status,
+ string modified,
+ string queueAddress,
+ PagingInfo pagingInfo,
+ SortInfo sortInfo
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.Advanced
+ .AsyncDocumentQuery()
+ .Statistics(out var stats)
+ .FilterByStatusWhere(status)
+ .FilterByLastModifiedRange(modified)
+ .FilterByQueueAddress(queueAddress)
+ .Sort(sortInfo)
+ .Paging(pagingInfo)
+ // TODO: Fix SetResultTransformer
+ //.SetResultTransformer(new FailedMessageViewTransformer().TransformerName)
+ .SelectFields()
+ .ToListAsync();
+
+ return new QueryResult>(results, stats.ToQueryStatsInfo());
+ }
+ }
+
+ public async Task ErrorsHead(
+ string status,
+ string modified,
+ string queueAddress
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var stats = await session.Advanced
+ .AsyncDocumentQuery()
+ .FilterByStatusWhere(status)
+ .FilterByLastModifiedRange(modified)
+ .FilterByQueueAddress(queueAddress)
+ .GetQueryResultAsync();
+
+ return stats.ToQueryStatsInfo();
+ }
+ }
+
+ public async Task>> ErrorsByEndpointName(
+ string status,
+ string endpointName,
+ string modified,
+ PagingInfo pagingInfo,
+ SortInfo sortInfo
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.Advanced
+ .AsyncDocumentQuery()
+ .Statistics(out var stats)
+ .FilterByStatusWhere(status)
+ .AndAlso()
+ .WhereEquals("ReceivingEndpointName", endpointName)
+ .FilterByLastModifiedRange(modified)
+ .Sort(sortInfo)
+ .Paging(pagingInfo)
+ // TODO: Fix SetResultTransformer
+ //.SetResultTransformer(new FailedMessageViewTransformer().TransformerName)
+ .SelectFields()
+ .ToListAsync();
+
+ return new QueryResult>(results, stats.ToQueryStatsInfo());
+ }
+ }
+
+ public async Task> ErrorsSummary()
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var facetResults = await session.Query()
+ .AggregateBy(new List
+ {
+ new Facet
+ {
+ FieldName = "Name",
+ DisplayFieldName = "Endpoints"
+ },
+ new Facet
+ {
+ FieldName = "Host",
+ DisplayFieldName = "Hosts"
+ },
+ new Facet
+ {
+ FieldName = "MessageType",
+ DisplayFieldName = "Message types"
+ }
+ }).ExecuteAsync();
+
+ var results = facetResults
+ .ToDictionary(
+ x => x.Key,
+ x => (object)x.Value
+ );
+
+ return results;
+ }
+ }
+
+ public async Task ErrorBy(Guid failedMessageId)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var message = await session.LoadAsync(failedMessageId.ToString());
+ return message;
+ }
+ }
+
+ public async Task ErrorBy(string failedMessageId)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var message = await session.LoadAsync(FailedMessageIdGenerator.MakeDocumentId(failedMessageId));
+ return message;
+ }
+ }
+
+ public Task CreateNotificationsManager()
+ {
+ var session = documentStore.OpenAsyncSession();
+ var manager = new NotificationsManager(session);
+
+ return Task.FromResult(manager);
+ }
+
+ public async Task ErrorLastBy(Guid failedMessageId)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var message = await session.LoadAsync(failedMessageId.ToString());
+ if (message == null)
+ {
+ return null;
+ }
+ var result = Map(message, session);
+ return result;
+ }
+ }
+
+ static FailedMessageView Map(FailedMessage message, IAsyncDocumentSession session)
+ {
+ var processingAttempt = message.ProcessingAttempts.Last();
+
+ var metadata = processingAttempt.MessageMetadata;
+ var failureDetails = processingAttempt.FailureDetails;
+ var wasEdited = message.ProcessingAttempts.Last().Headers.ContainsKey("ServiceControl.EditOf");
+
+ var failedMsgView = new FailedMessageView
+ {
+ Id = message.UniqueMessageId,
+ MessageType = metadata.GetAsStringOrNull("MessageType"),
+ IsSystemMessage = metadata.GetOrDefault("IsSystemMessage"),
+ TimeSent = metadata.GetAsNullableDateTime("TimeSent"),
+ MessageId = metadata.GetAsStringOrNull("MessageId"),
+ Exception = failureDetails.Exception,
+ QueueAddress = failureDetails.AddressOfFailingEndpoint,
+ NumberOfProcessingAttempts = message.ProcessingAttempts.Count,
+ Status = message.Status,
+ TimeOfFailure = failureDetails.TimeOfFailure,
+ LastModified = session.Advanced.GetLastModifiedFor(message).Value,
+ Edited = wasEdited,
+ EditOf = wasEdited ? message.ProcessingAttempts.Last().Headers["ServiceControl.EditOf"] : ""
+ };
+
+ try
+ {
+ failedMsgView.SendingEndpoint = metadata.GetOrDefault("SendingEndpoint");
+ }
+ catch (Exception ex)
+ {
+ Logger.Warn($"Unable to parse SendingEndpoint from metadata for messageId {message.UniqueMessageId}", ex);
+ failedMsgView.SendingEndpoint = EndpointDetailsParser.SendingEndpoint(processingAttempt.Headers);
+ }
+
+ try
+ {
+ failedMsgView.ReceivingEndpoint = metadata.GetOrDefault("ReceivingEndpoint");
+ }
+ catch (Exception ex)
+ {
+ Logger.Warn($"Unable to parse ReceivingEndpoint from metadata for messageId {message.UniqueMessageId}", ex);
+ failedMsgView.ReceivingEndpoint = EndpointDetailsParser.ReceivingEndpoint(processingAttempt.Headers);
+ }
+
+ return failedMsgView;
+ }
+
+
+ public async Task EditComment(string groupId, string comment)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var groupComment =
+ await session.LoadAsync(GroupComment.MakeId(groupId))
+ ?? new GroupComment { Id = GroupComment.MakeId(groupId) };
+
+ groupComment.Comment = comment;
+
+ await session.StoreAsync(groupComment);
+ await session.SaveChangesAsync();
+ }
+ }
+
+ public async Task DeleteComment(string groupId)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ session.Delete(GroupComment.MakeId(groupId));
+ await session.SaveChangesAsync();
+ }
+ }
+
+ public async Task>> GetGroupErrors(
+ string groupId,
+ string status,
+ string modified,
+ SortInfo sortInfo,
+ PagingInfo pagingInfo
+ )
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session.Advanced
+ .AsyncDocumentQuery()
+ .Statistics(out var stats)
+ .WhereEquals(view => view.FailureGroupId, groupId)
+ .FilterByStatusWhere(status)
+ .FilterByLastModifiedRange(modified)
+ .Sort(sortInfo)
+ .Paging(pagingInfo)
+ // TODO: Fix SetResultTransformer
+ //.SetResultTransformer(FailedMessageViewTransformer.Name)
+ .SelectFields()
+ .ToListAsync();
+
+ return results.ToQueryResult(stats);
+ }
+ }
+
+ public async Task GetGroupErrorsCount(string groupId, string status, string modified)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var queryResult = await session.Advanced
+ .AsyncDocumentQuery()
+ .WhereEquals(view => view.FailureGroupId, groupId)
+ .FilterByStatusWhere(status)
+ .FilterByLastModifiedRange(modified)
+ .GetQueryResultAsync();
+
+ return queryResult.ToQueryStatsInfo();
+ }
+ }
+
+ public async Task>> GetGroup(string groupId, string status, string modified)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var queryResult = await session.Advanced
+ .AsyncDocumentQuery()
+ .Statistics(out var stats)
+ .WhereEquals(group => group.Id, groupId)
+ .FilterByStatusWhere(status)
+ .FilterByLastModifiedRange(modified)
+ .ToListAsync();
+
+ return queryResult.ToQueryResult(stats);
+ }
+ }
+
+ public async Task MarkMessageAsResolved(string failedMessageId)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ session.Advanced.UseOptimisticConcurrency = true;
+
+ var failedMessage = await session.LoadAsync(failedMessageId);
+
+ if (failedMessage == null)
+ {
+ return false;
+ }
+
+ failedMessage.Status = FailedMessageStatus.Resolved;
+
+ await session.SaveChangesAsync();
+
+ return true;
+ }
+ }
+
+ public async Task ProcessPendingRetries(DateTime periodFrom, DateTime periodTo, string queueAddress, Func processCallback)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var prequery = session.Advanced
+ .AsyncDocumentQuery()
+ .WhereEquals("Status", (int)FailedMessageStatus.RetryIssued)
+ .AndAlso()
+ .WhereBetween("LastModified", periodFrom.Ticks, periodTo.Ticks);
+
+ if (!string.IsNullOrWhiteSpace(queueAddress))
+ {
+ prequery = prequery.AndAlso()
+ .WhereEquals(options => options.QueueAddress, queueAddress);
+ }
+
+ var query = prequery
+ // TODO: Fix SetResultTransformer
+ //.SetResultTransformer(new FailedMessageViewTransformer().TransformerName)
+ .SelectFields();
+
+ await using (var ie = await session.Advanced.StreamAsync(query))
+ {
+ while (await ie.MoveNextAsync())
+ {
+ await processCallback(ie.Current.Document.Id);
+ }
+ }
+ }
+ }
+
+ class DocumentPatchResult
+ {
+ public string Document { get; set; }
+ }
+
+ public async Task<(string[] ids, int count)> UnArchiveMessagesByRange(DateTime from, DateTime to, DateTime cutOff)
+ {
+ // TODO: Make sure this new implementation actually works, not going to delete the old implementation (commented below) until then
+ var patch = new PatchByQueryOperation(new IndexQuery
+ {
+ Query = $@"from index '{new FailedMessageViewIndex().IndexName} as msg
+ where msg.LastModified >= args.From and msg.LastModified <= args.To
+ where msg.Status == args.Archived
+ update
+ {{
+ msg.Status = args.Unresolved
+ }}",
+ QueryParameters =
+ {
+ { "From", from },
+ { "To", to },
+ { "Unresolved", (int)FailedMessageStatus.Unresolved },
+ { "Archived", (int)FailedMessageStatus.Archived },
+ }
+ }, new QueryOperationOptions
+ {
+ AllowStale = true,
+ RetrieveDetails = true
+ });
+
+ var operation = await documentStore.Operations.SendAsync(patch);
+
+ var result = await operation.WaitForCompletionAsync();
+
+ var ids = result.Details.OfType()
+ .Select(d => d.Id)
+ .ToArray();
+
+ // TODO: Are we *really* returning an array AND the length of the same array?
+ return (ids, ids.Length);
+
+ // var options = new BulkOperationOptions
+ // {
+ // AllowStale = true
+ // };
+
+ // var result = await documentStore.AsyncDatabaseCommands.UpdateByIndexAsync(
+ // new FailedMessageViewIndex().IndexName,
+ // new IndexQuery
+ // {
+ // Query = string.Format(CultureInfo.InvariantCulture, "LastModified:[{0} TO {1}] AND Status:{2}", from.Ticks, to.Ticks, (int)FailedMessageStatus.Archived),
+ // Cutoff = cutOff
+ // }, new ScriptedPatchRequest
+ // {
+ // Script = @"
+ //if(this.Status === archivedStatus) {
+ // this.Status = unresolvedStatus;
+ //}
+ //",
+ // Values =
+ // {
+ // {"archivedStatus", (int)FailedMessageStatus.Archived},
+ // {"unresolvedStatus", (int)FailedMessageStatus.Unresolved}
+ // }
+ // }, options);
+
+ // var patchedDocumentIds = (await result.WaitForCompletionAsync())
+ // .JsonDeserialization();
+
+ // return (
+ // patchedDocumentIds.Select(x => FailedMessageIdGenerator.GetMessageIdFromDocumentId(x.Document)).ToArray(),
+ // patchedDocumentIds.Length
+ // );
+ }
+
+ public async Task<(string[] ids, int count)> UnArchiveMessages(IEnumerable failedMessageIds)
+ {
+ Dictionary failedMessages;
+
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ session.Advanced.UseOptimisticConcurrency = true;
+
+ var documentIds = failedMessageIds.Select(FailedMessageIdGenerator.MakeDocumentId);
+
+ failedMessages = await session.LoadAsync(documentIds);
+
+ foreach (var failedMessage in failedMessages.Values)
+ {
+ if (failedMessage.Status == FailedMessageStatus.Archived)
+ {
+ failedMessage.Status = FailedMessageStatus.Unresolved;
+ }
+ }
+
+ await session.SaveChangesAsync();
+ }
+
+ return (
+ failedMessages.Values.Select(x => x.UniqueMessageId).ToArray(), // TODO: (ramon) I don't think we can use Keys here as UniqueMessageId is something different than failedMessageId right?
+ failedMessages.Count
+ );
+ }
+
+ public async Task RevertRetry(string messageUniqueId)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var failedMessage = await session
+ .LoadAsync(FailedMessageIdGenerator.MakeDocumentId(messageUniqueId));
+ if (failedMessage != null)
+ {
+ failedMessage.Status = FailedMessageStatus.Unresolved;
+ }
+
+ var failedMessageRetry = await session
+ .LoadAsync(FailedMessageRetry.MakeDocumentId(messageUniqueId));
+ if (failedMessageRetry != null)
+ {
+ session.Delete(failedMessageRetry);
+ }
+
+ await session.SaveChangesAsync();
+ }
+ }
+
+ public async Task RemoveFailedMessageRetryDocument(string uniqueMessageId)
+ {
+ using var session = documentStore.OpenAsyncSession();
+ await session.Advanced.RequestExecutor.ExecuteAsync(new DeleteDocumentCommand(FailedMessageRetry.MakeDocumentId(uniqueMessageId), null), session.Advanced.Context);
+ }
+
+ // TODO: Once using .NET, consider using IAsyncEnumerable here as this is an unbounded query
+ public async Task GetRetryPendingMessages(DateTime from, DateTime to, string queueAddress)
+ {
+ var ids = new List();
+
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var query = session.Advanced
+ .AsyncDocumentQuery()
+ .WhereEquals("Status", (int)FailedMessageStatus.RetryIssued)
+ .AndAlso()
+ .WhereBetween(options => options.LastModified, from.Ticks, to.Ticks)
+ .AndAlso()
+ .WhereEquals(o => o.QueueAddress, queueAddress)
+ // TODO: Fix SetResultTransformer
+ //.SetResultTransformer(FailedMessageViewTransformer.Name)
+ .SelectFields(new[] { "Id" });
+
+ await using (var ie = await session.Advanced.StreamAsync(query))
+ {
+ while (await ie.MoveNextAsync())
+ {
+ ids.Add(ie.Current.Document.Id);
+ }
+ }
+ }
+
+ return ids.ToArray();
+ }
+
+ public Task FetchFromFailedMessage(string uniqueMessageId)
+ {
+ throw new NotSupportedException("Body not stored embedded");
+ }
+
+ public async Task StoreEventLogItem(EventLogItem logItem)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ await session.StoreAsync(logItem);
+ await session.SaveChangesAsync();
+ }
+ }
+
+ public async Task StoreFailedMessagesForTestsOnly(params FailedMessage[] failedMessages)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ foreach (var message in failedMessages)
+ {
+ await session.StoreAsync(message);
+ }
+
+ await session.SaveChangesAsync();
+ }
+ }
+
+ static readonly ILog Logger = LogManager.GetLogger();
+ }
+}
diff --git a/src/ServiceControl.Persistence.RavenDb5/EventLogDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/EventLogDataStore.cs
new file mode 100644
index 0000000000..597f52958b
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/EventLogDataStore.cs
@@ -0,0 +1,43 @@
+namespace ServiceControl.Persistence.RavenDb
+{
+ using System.Collections.Generic;
+ using System.Threading.Tasks;
+ using EventLog;
+ using Persistence.Infrastructure;
+ using Raven.Client.Documents;
+ using Raven.Client.Documents.Session;
+
+ class EventLogDataStore : IEventLogDataStore
+ {
+ readonly IDocumentStore documentStore;
+
+ public EventLogDataStore(IDocumentStore documentStore)
+ {
+ this.documentStore = documentStore;
+ }
+
+ public async Task Add(EventLogItem logItem)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ await session.StoreAsync(logItem);
+ await session.SaveChangesAsync();
+ }
+ }
+
+ public async Task<(IList, int, string)> GetEventLogItems(PagingInfo pagingInfo)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ var results = await session
+ .Query()
+ .Statistics(out var stats)
+ .OrderByDescending(p => p.RaisedAt, OrderingType.Double)
+ .Paging(pagingInfo)
+ .ToListAsync();
+
+ return (results, stats.TotalResults, stats.ResultEtag.ToString());
+ }
+ }
+ }
+}
diff --git a/src/ServiceControl.Persistence.RavenDb5/Extensions.cs b/src/ServiceControl.Persistence.RavenDb5/Extensions.cs
new file mode 100644
index 0000000000..55f2936ace
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/Extensions.cs
@@ -0,0 +1,32 @@
+namespace ServiceControl.Infrastructure.RavenDB
+{
+ using System;
+ using System.Threading;
+ using Newtonsoft.Json.Linq;
+ using Raven.Client.Documents.Conventions;
+ using Raven.Client.Documents.Queries;
+
+ static class Extensions
+ {
+ //public static void Query(this DocumentDatabase db, string index, IndexQuery query, Action onItem, TState state, CancellationToken cancellationToken = default)
+ //{
+ // var results = db.Queries.Query(index, query, cancellationToken);
+ // foreach (var doc in results.Results)
+ // {
+ // onItem(doc, state);
+ // }
+ //}
+
+ // TODO: This polyfill of RavenDB 3.5 is a guess based loosely on https://github.com/ravendb/ravendb/blob/v3.5/Raven.Client.Lightweight/Document/DocumentConvention.cs#L151
+ public static string DefaultFindFullDocumentKeyFromNonStringIdentifier(this DocumentConventions conventions, T id, Type collectionType, bool allowNull)
+ {
+ if (allowNull && id.Equals(default(T)))
+ {
+ return null;
+ }
+
+ var collectionName = conventions.FindCollectionName(collectionType);
+ return $"{collectionName}{conventions.IdentityPartsSeparator}{id}";
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/Extensions/DictionaryExtensions.cs b/src/ServiceControl.Persistence.RavenDb5/Extensions/DictionaryExtensions.cs
new file mode 100644
index 0000000000..7aabb6ea83
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/Extensions/DictionaryExtensions.cs
@@ -0,0 +1,16 @@
+namespace ServiceControl.Infrastructure
+{
+ using System;
+ using System.Collections.Generic;
+
+ static class DictionaryExtensions
+ {
+ public static void CheckIfKeyExists(string key, IReadOnlyDictionary headers, Action actionToInvokeWhenKeyIsFound)
+ {
+ if (headers.TryGetValue(key, out var value))
+ {
+ actionToInvokeWhenKeyIsFound(value);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/Extensions/QueryResultConvert.cs b/src/ServiceControl.Persistence.RavenDb5/Extensions/QueryResultConvert.cs
new file mode 100644
index 0000000000..ba7100e879
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/Extensions/QueryResultConvert.cs
@@ -0,0 +1,15 @@
+namespace ServiceControl.Persistence.RavenDb
+{
+ using System.Collections.Generic;
+ using Persistence.Infrastructure;
+ using Raven.Client.Documents.Session;
+
+ static class QueryResultConvert
+ {
+ public static QueryResult> ToQueryResult(this IList result, QueryStatistics stats)
+ where T : class
+ {
+ return new QueryResult>(result, stats.ToQueryStatsInfo());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDb5/ExternalIntegrationRequestsDataStore.cs b/src/ServiceControl.Persistence.RavenDb5/ExternalIntegrationRequestsDataStore.cs
new file mode 100644
index 0000000000..0ea74e7346
--- /dev/null
+++ b/src/ServiceControl.Persistence.RavenDb5/ExternalIntegrationRequestsDataStore.cs
@@ -0,0 +1,217 @@
+namespace ServiceControl.Persistence.RavenDb
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Reactive.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using ExternalIntegrations;
+ using Microsoft.Extensions.Hosting;
+ using NServiceBus;
+ using NServiceBus.Logging;
+ using ServiceBus.Management.Infrastructure.Extensions;
+ using Raven.Client.Documents;
+ using Raven.Client.Documents.Changes;
+
+ class ExternalIntegrationRequestsDataStore
+ : IExternalIntegrationRequestsDataStore
+ , IHostedService
+ , IAsyncDisposable
+ {
+ public ExternalIntegrationRequestsDataStore(RavenDBPersisterSettings settings, IDocumentStore documentStore, CriticalError criticalError)
+ {
+ this.settings = settings;
+ this.documentStore = documentStore;
+
+ circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "EventDispatcher",
+ TimeSpan.FromMinutes(5), // TODO: Shouldn't be magic value but coming from settings
+ ex => criticalError.Raise("Repeated failures when dispatching external integration events.", ex),
+ TimeSpan.FromSeconds(20) // TODO: Shouldn't be magic value but coming from settings
+ );
+ }
+
+ const string KeyPrefix = "ExternalIntegrationDispatchRequests";
+
+ public async Task StoreDispatchRequest(IEnumerable dispatchRequests)
+ {
+ using (var session = documentStore.OpenAsyncSession())
+ {
+ foreach (var dispatchRequest in dispatchRequests)
+ {
+ if (dispatchRequest.Id != null)
+ {
+ throw new ArgumentException("Items cannot have their Id property set");
+ }
+
+ dispatchRequest.Id = KeyPrefix + "/" + Guid.NewGuid(); // TODO: Key is generated to persistence
+ await session.StoreAsync(dispatchRequest);
+ }
+
+ await session.SaveChangesAsync();
+ }
+ }
+
+ public void Subscribe(Func