Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/RabbitMQCoreClient/BatchQueueSender/QueueBufferService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ internal sealed class QueueBufferService : IQueueBufferService, IDisposable
/// <inheritdoc />
public IMessageSerializer Serializer { get; }

const string ErrorWhileWritingEvents = "There was an error while writing events. Details: {ErrorMessage}";
const string ErrorOnAfterWriteEvents = "There was an error execution OnAfterWriteEvents method. Details: {ErrorMessage}";
const string ErrorOnWriteErrors = "There was an error execution OnWriteErrors method. Details: {ErrorMessage}";
const string ErrorWhileWritingEvents = "There was an error while writing events.";
const string ErrorOnAfterWriteEvents = "There was an error execution OnAfterWriteEvents method.";
const string ErrorOnWriteErrors = "There was an error execution OnWriteErrors method.";

/// <summary>
/// The implementation constructor of the event storage buffer.
Expand Down Expand Up @@ -149,7 +149,7 @@ async Task ProcessItemsAsync(EventItem[] array, int count)
errorEvents ??= new List<EventItem>();
foreach (var innerEx in task.Exception.InnerExceptions)
{
_log?.LogError(innerEx, ErrorWhileWritingEvents, innerEx.Message);
_log?.LogError(innerEx, ErrorWhileWritingEvents);
// Log exception
errorEvents.AddRange(eventsGroup);
}
Expand All @@ -165,7 +165,8 @@ async Task ProcessItemsAsync(EventItem[] array, int count)
{
ArrayPool<EventItem>.Shared.Return(array);

_log?.LogInformation("Buffer has written '{RecordsCount}' records to the database at '{ElapsedMilliseconds}' ms.",
if (_log?.IsEnabled(LogLevel.Debug) == true)
_log?.LogDebug("RabbitMQ Buffer has sent '{EventsCount}' events to the queue bus at '{ElapsedMilliseconds}' ms.",
count, sw.ElapsedMilliseconds);
}

Expand All @@ -176,7 +177,7 @@ async Task ProcessItemsAsync(EventItem[] array, int count)
}
catch (Exception e)
{
_log?.LogError(e, ErrorOnAfterWriteEvents, e.Message);
_log?.LogError(e, ErrorOnAfterWriteEvents);
}

try
Expand All @@ -186,7 +187,7 @@ async Task ProcessItemsAsync(EventItem[] array, int count)
}
catch (Exception e)
{
_log?.LogError(e, ErrorOnWriteErrors, e.Message);
_log?.LogError(e, ErrorOnWriteErrors);
}
}

Expand Down
15 changes: 9 additions & 6 deletions src/RabbitMQCoreClient/QueueService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ await _publishChannel.BasicPublishAsync(exchange: exchange,
mandatory: false, // Just not reacting when no queue is subscribed for key.
basicProperties: props,
body: obj);
_log.LogDebug("Sent raw message to exchange '{Exchange}' with routing key '{RoutingKey}'.",
exchange, routingKey);
if (_log.IsEnabled(LogLevel.Debug))
_log.LogDebug("Sent raw message to exchange '{Exchange}' with routing key '{RoutingKey}'.",
exchange, routingKey);
}

#endregion
Expand Down Expand Up @@ -313,8 +314,9 @@ public async ValueTask SendBatchAsync(
// evenly divisible by batch size.
await MaybeAwaitPublishes(publishTasks, 0);

_log.LogDebug("Sent raw messages batch to exchange '{Exchange}' " +
"with routing key '{RoutingKey}'.", exchange, routingKey);
if (_log.IsEnabled(LogLevel.Debug))
_log.LogDebug("Sent raw messages batch to exchange '{Exchange}' " +
"with routing key '{RoutingKey}'.", exchange, routingKey);
}

/// <inheritdoc />
Expand Down Expand Up @@ -373,8 +375,9 @@ public async ValueTask SendBatchAsync(
// evenly divisible by batch size.
await MaybeAwaitPublishes(publishTasks, 0);

_log.LogDebug("Sent raw messages batch to exchange '{Exchange}' " +
"with routing key '{RoutingKey}'.", exchange, routingKey);
if (_log.IsEnabled(LogLevel.Debug))
_log.LogDebug("Sent raw messages batch to exchange '{Exchange}' " +
"with routing key '{RoutingKey}'.", exchange, routingKey);
}

async Task MaybeAwaitPublishes(List<ValueTask> publishTasks, int batchSize)
Expand Down
2 changes: 1 addition & 1 deletion src/RabbitMQCoreClient/RabbitMQCoreClient.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Version>7.0.0</Version>
<Version>7.0.1</Version>
<VersionSuffix>$(VersionSuffix)</VersionSuffix>
<Version Condition=" '$(VersionSuffix)' != '' ">$(Version)-$(VersionSuffix)</Version>
<IsPackable>true</IsPackable>
Expand Down
19 changes: 12 additions & 7 deletions src/RabbitMQCoreClient/RabbitMQCoreClientConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,17 @@ async Task Consumer_Received(object? sender, BasicDeliverEventArgs @event)

var rabbitArgs = new RabbitMessageEventArgs(@event.RoutingKey, @event.ConsumerTag);

_log.LogDebug("New message received with deliveryTag='{DeliveryTag}'.", @event.DeliveryTag);
if (_log.IsEnabled(LogLevel.Debug))
_log.LogDebug("New message received with deliveryTag='{DeliveryTag}'.", @event.DeliveryTag);

// Send a message to the death queue if ttl is over.
if (@event.BasicProperties.Headers?.TryGetValue(AppConstants.RabbitMQHeaders.TtlHeader, out var ttl) == true
&& ttl is int ttlInt
&& ttlInt <= 0)
{
await ConsumeChannel.BasicNackAsync(@event.DeliveryTag, false, false, _serviceLifetimeToken);
_log.LogDebug("Message was rejected due to low ttl.");
if (_log.IsEnabled(LogLevel.Debug))
_log.LogDebug("Message was rejected due to low ttl.");
return;
}

Expand All @@ -159,13 +161,15 @@ async Task Consumer_Received(object? sender, BasicDeliverEventArgs @event)

var handlerContext = new MessageHandlerContext(new(), handlerOptions);

_log.LogDebug("Created scope for handler type '{TypeName}'. Start processing message.",
handler.GetType().Name);
if (_log.IsEnabled(LogLevel.Debug))
_log.LogDebug("Created scope for handler type '{TypeName}'. Start processing message.",
handler.GetType().Name);
try
{
await handler.HandleMessage(@event.Body, rabbitArgs, handlerContext);
await ConsumeChannel.BasicAckAsync(@event.DeliveryTag, false, _serviceLifetimeToken);
_log.LogDebug("Message successfully processed by handler type '{TypeName}' " +
if (_log.IsEnabled(LogLevel.Debug))
_log.LogDebug("Message successfully processed by handler type '{TypeName}' " +
"with deliveryTag='{DeliveryTag}'.", handler?.GetType().Name, @event.DeliveryTag);
}
catch (Exception e)
Expand Down Expand Up @@ -258,8 +262,9 @@ await ConsumeChannel.QueueBindAsync(

async Task RejectDueToNoHandler(BasicDeliverEventArgs ea)
{
_log.LogDebug("Message was rejected due to no handler configured for the routing key '{RoutingKey}'.",
ea.RoutingKey);
if (_log.IsEnabled(LogLevel.Debug))
_log.LogDebug("Message was rejected due to no handler configured for the routing key '{RoutingKey}'.",
ea.RoutingKey);

if (ConsumeChannel != null)
await ConsumeChannel.BasicNackAsync(ea.DeliveryTag, false, false, _serviceLifetimeToken);
Expand Down
Loading