Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public async Task Should_return_one_endpoint_in_grouping_in_throughput_summary_w
{
// Arrange
await DataStore.CreateBuilder()
.AddEndpoint("Endpoint1", sources: [ThroughputSource.Broker])
.AddEndpoint("endpoint1", sources: [ThroughputSource.Broker])
.ConfigureEndpoint(endpoint => endpoint.SanitizedName = "endpoint1")
.WithThroughput(data: [50])
.AddEndpoint("Endpoint1", sources: [ThroughputSource.Audit])
Expand All @@ -43,6 +43,8 @@ await DataStore.CreateBuilder()
// Assert
Assert.That(summary, Is.Not.Null);
Assert.That(summary, Has.Count.EqualTo(1));
//should see 1 endpoint with both throughputs, and return 60 as the maximum one
Assert.That(summary.Sum(s => s.MaxDailyThroughput), Is.EqualTo(60));
}


Expand All @@ -51,7 +53,7 @@ public async Task Should_return_one_endpoint_in_grouping_in_throughput_report_wh
{
// Arrange
await DataStore.CreateBuilder()
.AddEndpoint("Endpoint1", sources: [ThroughputSource.Broker])
.AddEndpoint("endpoint1", sources: [ThroughputSource.Broker])
.ConfigureEndpoint(endpoint => endpoint.SanitizedName = "endpoint1")
.WithThroughput(data: [50])
.AddEndpoint("Endpoint1", sources: [ThroughputSource.Audit])
Expand All @@ -67,14 +69,18 @@ await DataStore.CreateBuilder()
// Assert
Assert.That(report, Is.Not.Null);
Assert.That(report.ReportData.Queues.Count, Is.EqualTo(1));
//should see 1 endpoint with both throughputs, and return 60 as the maximum one
Assert.That(report.ReportData.TotalThroughput, Is.EqualTo(60));
Assert.That(report.ReportData.Queues.FirstOrDefault(f => f.QueueName == "Endpoint1").DailyThroughputFromAudit.Sum(s => s.MessageCount), Is.EqualTo(60));
Assert.That(report.ReportData.Queues.FirstOrDefault(f => f.QueueName == "Endpoint1").DailyThroughputFromBroker.Sum(s => s.MessageCount), Is.EqualTo(50));
}

[Test]
public async Task Should_return_two_endpoints_in_grouping_in_throughput_summary_when_sanitizednames_are_same_but_different_case_when_not_using_cleansing()
{
// Arrange
await DataStore.CreateBuilder()
.AddEndpoint("Endpoint1", sources: [ThroughputSource.Broker])
.AddEndpoint("endpoint1", sources: [ThroughputSource.Broker])
.ConfigureEndpoint(endpoint => endpoint.SanitizedName = "endpoint1")
.WithThroughput(data: [50])
.AddEndpoint("Endpoint1", sources: [ThroughputSource.Audit])
Expand All @@ -90,6 +96,8 @@ await DataStore.CreateBuilder()
// Assert
Assert.That(summary, Is.Not.Null);
Assert.That(summary, Has.Count.EqualTo(2));
//two different endpoints hence total throughput is a sum of both of them
Assert.That(summary.Sum(s => s.MaxDailyThroughput), Is.EqualTo(110));
}


Expand All @@ -98,7 +106,7 @@ public async Task Should_return_two_endpoints_in_grouping_in_throughput_report_w
{
// Arrange
await DataStore.CreateBuilder()
.AddEndpoint("Endpoint1", sources: [ThroughputSource.Broker])
.AddEndpoint("endpoint1", sources: [ThroughputSource.Broker])
.ConfigureEndpoint(endpoint => endpoint.SanitizedName = "endpoint1")
.WithThroughput(data: [50])
.AddEndpoint("Endpoint1", sources: [ThroughputSource.Audit])
Expand All @@ -114,6 +122,8 @@ await DataStore.CreateBuilder()
// Assert
Assert.That(report, Is.Not.Null);
Assert.That(report.ReportData.Queues.Count, Is.EqualTo(2));
//two different endpoints hence total throughput is a sum of both of them
Assert.That(report.ReportData.TotalThroughput, Is.EqualTo(110));
}

class BrokerThroughputQuery_WithLowerCaseSanitizedNameCleanse : IBrokerThroughputQuery
Expand Down
119 changes: 74 additions & 45 deletions src/Particular.LicensingComponent/ThroughputCollector.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
namespace Particular.LicensingComponent;

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using AuditThroughput;
using Contracts;
using MonitoringThroughput;
Expand Down Expand Up @@ -59,28 +62,16 @@ public async Task UpdateUserIndicatorsOnEndpoints(List<UpdateUserIndicator> user

public async Task<List<EndpointThroughputSummary>> GetThroughputSummary(CancellationToken cancellationToken)
{
var endpoints = (await dataStore.GetAllEndpoints(false, cancellationToken)).ToList();
var queueNames = endpoints.Select(endpoint => endpoint.SanitizedName).Distinct().ToList();
var endpointThroughputPerQueue = await dataStore.GetEndpointThroughputByQueueName(queueNames, cancellationToken);
var endpointSummaries = new List<EndpointThroughputSummary>();

//group endpoints by sanitized name - so to group throughput recorded from broker, audit and monitoring
foreach (var endpointGroupPerQueue in endpoints.GroupBy(g => CleanseSanitizedName(g.SanitizedName)))
await foreach (var endpointData in GetDistinctEndpointData(cancellationToken))
{
var data = new List<ThroughputData>();
if (endpointThroughputPerQueue.TryGetValue(endpointGroupPerQueue.Key, out var tempData))
{
data.AddRange(tempData);
}

var isKnownEndpoint = IsKnownEndpoint(endpointGroupPerQueue);
var endpointSummary = new EndpointThroughputSummary
{
//want to display the endpoint name to the user if it's different to the sanitized endpoint name
Name = endpointGroupPerQueue.FirstOrDefault(endpoint => endpoint.Id.Name != endpoint.SanitizedName)?.Id.Name ?? endpointGroupPerQueue.Key,
UserIndicator = UserIndicator(endpointGroupPerQueue) ?? (isKnownEndpoint ? Contracts.UserIndicator.NServiceBusEndpoint.ToString() : string.Empty),
IsKnownEndpoint = isKnownEndpoint,
MaxDailyThroughput = data.Max()
Name = endpointData.Name,
UserIndicator = endpointData.UserIndicator ?? (endpointData.IsKnownEndpoint ? Contracts.UserIndicator.NServiceBusEndpoint.ToString() : string.Empty),
IsKnownEndpoint = endpointData.IsKnownEndpoint,
MaxDailyThroughput = endpointData.ThroughputData.Max()
};

endpointSummaries.Add(endpointSummary);
Expand Down Expand Up @@ -124,40 +115,25 @@ public async Task<SignedReport> GenerateThroughputReport(string spVersion, DateT
var reportMasks = await dataStore.GetReportMasks(cancellationToken);
CreateMasks(reportMasks.ToArray());

var endpoints = (await dataStore.GetAllEndpoints(false, cancellationToken)).ToArray();
var queueNames = endpoints.Select(endpoint => endpoint.SanitizedName).Distinct().ToList();
var endpointThroughputPerQueue = await dataStore.GetEndpointThroughputByQueueName(queueNames, cancellationToken);
var queueThroughputs = new List<QueueThroughput>();
List<string> ignoredQueueNames = [];

//group endpoints by sanitized name - so to group throughput recorded from broker, audit and monitoring
foreach (var endpointGroupPerQueue in endpoints.GroupBy(g => CleanseSanitizedName(g.SanitizedName)))
await foreach (var endpointData in GetDistinctEndpointData(cancellationToken))
{
//want to display the endpoint name if it's different to the sanitized endpoint name
var endpointName = endpointGroupPerQueue.FirstOrDefault(endpoint => endpoint.Id.Name != endpoint.SanitizedName)?.Id.Name ?? endpointGroupPerQueue.Key;

if (!endpointThroughputPerQueue.TryGetValue(endpointGroupPerQueue.Key, out var data))
{
data = [];
}

var throughputData = data.ToList();

var userIndicator = UserIndicator(endpointGroupPerQueue) ?? null;
var notAnNsbEndpoint = userIndicator?.Equals(Contracts.UserIndicator.NotNServiceBusEndpoint.ToString(), StringComparison.OrdinalIgnoreCase) ?? false;
var notAnNsbEndpoint = endpointData.UserIndicator?.Equals(Contracts.UserIndicator.NotNServiceBusEndpoint.ToString(), StringComparison.OrdinalIgnoreCase) ?? false;

//get all data that we have, including daily values
var queueThroughput = new QueueThroughput
{
QueueName = Mask(endpointName),
UserIndicator = userIndicator,
EndpointIndicators = EndpointIndicators(endpointGroupPerQueue) ?? [],
NoDataOrSendOnly = throughputData.Sum() == 0,
Scope = EndpointScope(endpointGroupPerQueue) ?? "",
Throughput = throughputData.Max(),
DailyThroughputFromAudit = throughputData.FromSource(ThroughputSource.Audit).Select(s => new DailyThroughput { DateUTC = s.DateUTC, MessageCount = s.MessageCount }).ToArray(),
DailyThroughputFromMonitoring = throughputData.FromSource(ThroughputSource.Monitoring).Select(s => new DailyThroughput { DateUTC = s.DateUTC, MessageCount = s.MessageCount }).ToArray(),
DailyThroughputFromBroker = notAnNsbEndpoint ? [] : throughputData.FromSource(ThroughputSource.Broker).Select(s => new DailyThroughput { DateUTC = s.DateUTC, MessageCount = s.MessageCount }).ToArray()
QueueName = Mask(endpointData.Name),
UserIndicator = endpointData.UserIndicator,
EndpointIndicators = endpointData.EndpointIndicators ?? [],
NoDataOrSendOnly = endpointData.ThroughputData.Sum() == 0,
Scope = endpointData.Scope ?? "",
Throughput = endpointData.ThroughputData.Max(),
DailyThroughputFromAudit = endpointData.ThroughputData.FromSource(ThroughputSource.Audit).Select(s => new DailyThroughput { DateUTC = s.DateUTC, MessageCount = s.MessageCount }).ToArray(),
DailyThroughputFromMonitoring = endpointData.ThroughputData.FromSource(ThroughputSource.Monitoring).Select(s => new DailyThroughput { DateUTC = s.DateUTC, MessageCount = s.MessageCount }).ToArray(),
DailyThroughputFromBroker = notAnNsbEndpoint ? [] : endpointData.ThroughputData.FromSource(ThroughputSource.Broker).Select(s => new DailyThroughput { DateUTC = s.DateUTC, MessageCount = s.MessageCount }).ToArray()
};

queueThroughputs.Add(queueThroughput);
Expand Down Expand Up @@ -199,8 +175,8 @@ public async Task<SignedReport> GenerateThroughputReport(string spVersion, DateT

report.EnvironmentInformation.EnvironmentData[EnvironmentDataType.ServiceControlVersion.ToString()] = throughputSettings.ServiceControlVersion;
report.EnvironmentInformation.EnvironmentData[EnvironmentDataType.ServicePulseVersion.ToString()] = spVersion;
report.EnvironmentInformation.EnvironmentData[EnvironmentDataType.AuditEnabled.ToString()] = endpointThroughputPerQueue.HasDataFromSource(ThroughputSource.Audit).ToString();
report.EnvironmentInformation.EnvironmentData[EnvironmentDataType.MonitoringEnabled.ToString()] = endpointThroughputPerQueue.HasDataFromSource(ThroughputSource.Monitoring).ToString();
report.EnvironmentInformation.EnvironmentData[EnvironmentDataType.AuditEnabled.ToString()] = systemHasAuditEnabled.ToString();
report.EnvironmentInformation.EnvironmentData[EnvironmentDataType.MonitoringEnabled.ToString()] = systemHasMonitoringEnabled.ToString();

var throughputReport = new SignedReport { ReportData = report, Signature = Signature.SignReport(report) };
return throughputReport;
Expand Down Expand Up @@ -228,6 +204,57 @@ string Mask(string stringToMask)
}
}

async IAsyncEnumerable<EndpointData> GetDistinctEndpointData([EnumeratorCancellation] CancellationToken cancellationToken)
{
var endpoints = (await dataStore.GetAllEndpoints(false, cancellationToken)).ToArray();
var queueNames = endpoints.Select(endpoint => endpoint.SanitizedName).Distinct().ToList();
var endpointThroughputPerQueue = await dataStore.GetEndpointThroughputByQueueName(queueNames, cancellationToken);

systemHasAuditEnabled = endpointThroughputPerQueue.HasDataFromSource(ThroughputSource.Audit);
systemHasMonitoringEnabled = endpointThroughputPerQueue.HasDataFromSource(ThroughputSource.Monitoring);

//group endpoints by sanitized name - so to group throughput recorded from broker, audit and monitoring
//some brokers use lowercase only so we want to ensure that we are matching on what the user has setup via NSB in terms of nn endpoint name, and what is stored on the broker
foreach (var endpointGroupPerQueue in endpoints.GroupBy(g => CleanseSanitizedName(g.SanitizedName)))
{
//want to display the endpoint name if it's different to the sanitized endpoint name
var endpointName = endpointGroupPerQueue.FirstOrDefault(endpoint => !string.Equals(endpoint.Id.Name, endpointGroupPerQueue.Key, StringComparison.Ordinal))?.Id.Name ?? endpointGroupPerQueue.Key;

var throughputData = new List<ThroughputData>();
foreach (var endpointQueueName in endpointThroughputPerQueue.Keys.Where(k => CleanseSanitizedName(k) == endpointGroupPerQueue.Key))
{
if (endpointThroughputPerQueue.TryGetValue(endpointQueueName, out var tempData))
{
throughputData.AddRange(tempData);
}
}

var userIndicator = UserIndicator(endpointGroupPerQueue) ?? null;

yield return new EndpointData(endpointName, throughputData, userIndicator, EndpointScope(endpointGroupPerQueue), EndpointIndicators(endpointGroupPerQueue), IsKnownEndpoint(endpointGroupPerQueue));
}
}

class EndpointData
{
internal EndpointData(string name, List<ThroughputData> throughputData, string? userIndicator, string? scope, string[]? endpointIndicators, bool isKnownEndpoint)
{
Name = name;
ThroughputData = throughputData;
UserIndicator = userIndicator;
Scope = scope;
EndpointIndicators = endpointIndicators;
IsKnownEndpoint = isKnownEndpoint;
}

internal string Name { get; }
internal List<ThroughputData> ThroughputData { get; }
internal string? UserIndicator { get; }
internal string? Scope { get; }
internal string[]? EndpointIndicators { get; }
internal bool IsKnownEndpoint { get; }
}

string CleanseSanitizedName(string endpointName)
{
return throughputQuery == null ? endpointName : throughputQuery.SanitizedEndpointNameCleanser(endpointName);
Expand All @@ -241,4 +268,6 @@ string CleanseSanitizedName(string endpointName)
string[]? EndpointIndicators(IGrouping<string, Endpoint> endpoint) => endpoint.Where(w => w.EndpointIndicators?.Any() == true)?.SelectMany(s => s.EndpointIndicators)?.Distinct()?.ToArray();

readonly string transport = throughputQuery?.MessageTransport ?? throughputSettings.TransportType;
internal bool systemHasAuditEnabled;
internal bool systemHasMonitoringEnabled;
}
2 changes: 1 addition & 1 deletion src/ServiceControl.Transports.ASBS/AzureQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
DateOnly startDate,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
logger.LogInformation($"Gathering metrics for \"{brokerQueue}\" queue");
logger.LogInformation($"Gathering metrics for \"{brokerQueue.QueueName}\" queue");

var endDate = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime).AddDays(-1);
if (endDate < startDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,7 @@ public abstract IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBrokerQue

public virtual string SanitizeEndpointName(string endpointName) => endpointName;

//NOTE This was added after initial release to help with matching on sanitized name where the broker (azure) would auto lowercase all the names.
//If the logic was added to the SanitizeEndpointName function it would only apply to new records, and not historical data, so the report and endpoint groupings would be incorrect.
public virtual string SanitizedEndpointNameCleanser(string endpointName) => endpointName;
}