Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ public DatabaseConfiguration(string name,
TimeSpan auditRetentionPeriod,
int maxBodySizeToStore,
int minimumStorageLeftRequiredForIngestion,
ServerConfiguration serverConfiguration)
ServerConfiguration serverConfiguration,
TimeSpan bulkInsertCommitTimeout)
{
Name = name;
ExpirationProcessTimerInSeconds = expirationProcessTimerInSeconds;
EnableFullTextSearch = enableFullTextSearch;
AuditRetentionPeriod = auditRetentionPeriod;
MaxBodySizeToStore = maxBodySizeToStore;
ServerConfiguration = serverConfiguration;
BulkInsertCommitTimeout = bulkInsertCommitTimeout;
MinimumStorageLeftRequiredForIngestion = minimumStorageLeftRequiredForIngestion;
}

Expand All @@ -37,5 +39,7 @@ public DatabaseConfiguration(string name,
public int MaxBodySizeToStore { get; }

public int MinimumStorageLeftRequiredForIngestion { get; internal set; } //Setting for ATT only

public TimeSpan BulkInsertCommitTimeout { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class RavenPersistenceConfiguration : IPersistenceConfiguration
public const string LogPathKey = "LogPath";
public const string RavenDbLogLevelKey = "RavenDBLogLevel";
public const string MinimumStorageLeftRequiredForIngestionKey = "MinimumStorageLeftRequiredForIngestion";
public const string BulkInsertCommitTimeoutInSecondsKey = "BulkInsertCommitTimeoutInSeconds";

public IEnumerable<string> ConfigurationKeys => new[]{
DatabaseNameKey,
Expand All @@ -25,7 +26,8 @@ public class RavenPersistenceConfiguration : IPersistenceConfiguration
ExpirationProcessTimerInSecondsKey,
LogPathKey,
RavenDbLogLevelKey,
MinimumStorageLeftRequiredForIngestionKey
MinimumStorageLeftRequiredForIngestionKey,
BulkInsertCommitTimeoutInSecondsKey
};

public string Name => "RavenDB";
Expand Down Expand Up @@ -101,14 +103,17 @@ internal static DatabaseConfiguration GetDatabaseConfiguration(PersistenceSettin

var expirationProcessTimerInSeconds = GetExpirationProcessTimerInSeconds(settings);

var bulkInsertTimeout = TimeSpan.FromSeconds(GetBulkInsertCommitTimeout(settings));

return new DatabaseConfiguration(
databaseName,
expirationProcessTimerInSeconds,
settings.EnableFullTextSearchOnBodies,
settings.AuditRetentionPeriod,
settings.MaxBodySizeToStore,
minimumStorageLeftRequiredForIngestion,
serverConfiguration);
serverConfiguration,
bulkInsertTimeout);
}

static int GetExpirationProcessTimerInSeconds(PersistenceSettings settings)
Expand All @@ -135,6 +140,30 @@ static int GetExpirationProcessTimerInSeconds(PersistenceSettings settings)
return expirationProcessTimerInSeconds;
}

static int GetBulkInsertCommitTimeout(PersistenceSettings settings)
{
var bulkInsertCommitTimeoutInSeconds = BulkInsertCommitTimeoutInSecondsDefault;

if (settings.PersisterSpecificSettings.TryGetValue(BulkInsertCommitTimeoutInSecondsKey, out var bulkInsertCommitTimeoutString))
{
bulkInsertCommitTimeoutInSeconds = int.Parse(bulkInsertCommitTimeoutString);
}

if (bulkInsertCommitTimeoutInSeconds < 0)
{
Logger.Error($"BulkInsertCommitTimeout cannot be negative. Defaulting to {BulkInsertCommitTimeoutInSecondsDefault}");
return BulkInsertCommitTimeoutInSecondsDefault;
}

if (bulkInsertCommitTimeoutInSeconds > TimeSpan.FromHours(1).TotalSeconds)
{
Logger.Error($"BulkInsertCommitTimeout cannot be larger than {TimeSpan.FromHours(1).TotalSeconds}. Defaulting to {BulkInsertCommitTimeoutInSecondsDefault}");
return BulkInsertCommitTimeoutInSecondsDefault;
}

return bulkInsertCommitTimeoutInSeconds;
}

static string GetLogPath(PersistenceSettings settings)
{
if (!settings.PersisterSpecificSettings.TryGetValue(LogPathKey, out var logPath))
Expand All @@ -151,5 +180,6 @@ static string GetLogPath(PersistenceSettings settings)
static readonly ILog Logger = LogManager.GetLogger(typeof(RavenPersistenceConfiguration));

const int ExpirationProcessTimerInSecondsDefault = 600;
const int BulkInsertCommitTimeoutInSecondsDefault = 60;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public RavenAuditIngestionUnitOfWorkFactory(IRavenDocumentStoreProvider document

public IAuditIngestionUnitOfWork StartNew(int batchSize)
{
var timedCancellationSource = new CancellationTokenSource(TimeSpan.FromMinutes(1));
var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = documentStoreProvider.GetDocumentStore()
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static async Task<EmbeddedDatabase> GetInstance(CancellationToken cancell
var logsMode = "Operations";
var serverUrl = $"http://localhost:{PortUtility.FindAvailablePort(33334)}";

embeddedDatabase = EmbeddedDatabase.Start(new DatabaseConfiguration("audit", 60, true, TimeSpan.FromMinutes(5), 120000, 5, new ServerConfiguration(dbPath, serverUrl, logPath, logsMode)), new ApplicationLifetime(new NullLogger<ApplicationLifetime>()));
embeddedDatabase = EmbeddedDatabase.Start(new DatabaseConfiguration("audit", 60, true, TimeSpan.FromMinutes(5), 120000, 5, new ServerConfiguration(dbPath, serverUrl, logPath, logsMode), TimeSpan.FromSeconds(60)), new ApplicationLifetime(new NullLogger<ApplicationLifetime>()));

//make sure that the database is up
using var documentStore = await embeddedDatabase.Connect(cancellationToken);
Expand Down
25 changes: 22 additions & 3 deletions src/ServiceControl.DomainEvents/DomainEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.Logging;

public class DomainEvents : IDomainEvents
{
static readonly ILog Log = LogManager.GetLogger<DomainEvents>();

readonly IServiceProvider serviceProvider;
public DomainEvents(IServiceProvider serviceProvider) => this.serviceProvider = serviceProvider;

Expand All @@ -14,15 +17,31 @@ public async Task Raise<T>(T domainEvent) where T : IDomainEvent
var handlers = serviceProvider.GetServices<IDomainHandler<T>>();
foreach (var handler in handlers)
{
await handler.Handle(domainEvent)
.ConfigureAwait(false);
try
{
await handler.Handle(domainEvent)
.ConfigureAwait(false);
}
catch (Exception e)
{
Log.Error($"Unexpected error publishing domain event {typeof(T)}", e);
throw;
}
}

var ieventHandlers = serviceProvider.GetServices<IDomainHandler<IDomainEvent>>();
foreach (var handler in ieventHandlers)
{
await handler.Handle(domainEvent)
try
{
await handler.Handle(domainEvent)
.ConfigureAwait(false);
}
catch (Exception e)
{
Log.Error($"Unexpected error publishing domain event {typeof(T)}", e);
throw;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="NServiceBus" />
</ItemGroup>

</Project>
5 changes: 5 additions & 0 deletions src/ServiceControl/Monitoring/EndpointInstanceMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace ServiceControl.Monitoring
using Contracts.HeartbeatMonitoring;
using EndpointControl.Contracts;
using Infrastructure.DomainEvents;
using NLog.Fluent;
using NServiceBus.Logging;
using ServiceControl.Operations;
using ServiceControl.Persistence;

Expand Down Expand Up @@ -38,6 +40,7 @@ public async Task UpdateStatus(HeartbeatStatus newStatus, DateTime? latestTimest
if (newStatus != status)
{
await RaiseStateChangeEvents(newStatus, latestTimestamp);
Log.DebugFormat("Endpoint {0} status updated from {1} to {2}", Id.LogicalName, status, newStatus);
}

lastSeen = latestTimestamp;
Expand Down Expand Up @@ -132,6 +135,8 @@ public KnownEndpointsView GetKnownView()
};
}

static readonly ILog Log = LogManager.GetLogger<EndpointInstanceMonitor>();

IDomainEvents domainEvents;
DateTime? lastSeen;
HeartbeatStatus status;
Expand Down