-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQueueProcessingService.cs
More file actions
88 lines (73 loc) · 4.14 KB
/
QueueProcessingService.cs
File metadata and controls
88 lines (73 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace BackgroundQueue
{
public class QueueProcessingService : BackgroundService
{
private readonly IServiceProvider _services;
private readonly IMessageHandlerFactory _handlerFactory;
private readonly ILogger _logger;
public QueueProcessingService(ILogger<QueueProcessingService> logger, IServiceProvider services, IMessageHandlerFactory handlerFactory) {
_logger = logger;
_services = services;
_handlerFactory = handlerFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var pollDelay = TimeSpan.FromSeconds(1);
var inactivityCounter = 0;
while (!stoppingToken.IsCancellationRequested) {
var referenceTime = DateTime.UtcNow;
var messageCounter = 0;
using (var serviceScope = _services.CreateScope()) {
var queueSource = serviceScope.ServiceProvider.GetRequiredService<IQueueSource>();
using (var batch = (await queueSource.GetMessagesAsync(referenceTime))) {
foreach (var message in batch.Messages) {
messageCounter += 1;
try {
// Lookup the handler based on the type of the message
var handler = _handlerFactory.GetHandlerForMessage(message.MessageType, serviceScope.ServiceProvider);
if (handler == null) {
_logger.LogWarning("Failed to find a handler for {MessageType}", message.MessageType);
}
// handle the message
await handler.ExecuteAsync(message.MessageId, message.Data, stoppingToken);
// and mark it for removal
queueSource.RemoveMessage(message);
} catch (Exception e) {
_logger.LogError(e, "An error occurred while handling a message");
// Something went wrong and we couldn't handle the message so track the error and requeue the message
message.ErrorCount += 1;
message.ErrorMessage = e.Message;
message.LastRunTime = referenceTime;
message.NextRunTime = message.ErrorCount > 10
// If the message has failed too many times then dead-letter it
? DateTime.MaxValue
// Otherwise push it down the queue so it can be retried later
: DateTime.UtcNow.AddMinutes(message.ErrorCount);
// requeue the message
queueSource.UpdateMessage(message);
}
}
}
}
if (messageCounter == 0) {
inactivityCounter += 1;
} else {
inactivityCounter = 0;
}
// The longer we don't have any work to do, the longer we wait until the next check.
// The benefit is that during periods of inactivity we aren't spamming the db too often
// The downside is that when work does appear, the delay will impact the soonest time that it will be noticed.
// We also cap the delay, so that the wait doesn't get too long.
pollDelay = pollDelay.Add(TimeSpan.FromSeconds(Math.Min(Math.Floor(inactivityCounter / 120.0), 360)));
_logger.LogInformation("Delay until next execution is {Delay}", pollDelay);
await Task.Delay(pollDelay, stoppingToken);
}
}
}
}