Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# RabbitMQ Client library for .net core applications with Dependency Injection support
# RabbitMQ Client library for .net core applications with Dependency Injection support

Library Version: v6

Expand Down Expand Up @@ -606,6 +606,7 @@ Set to true to check peer certificate for revocation.
"SslCheckCertificateRevocation": false, // Introduced in v5.3.0
"SslCertPassphrase": "pass", // Introduced in v5.3.0
"SslCertPath": "/certs", // Introduced in v5.3.0
"MaxBodySize": 16777216, // Introduced in v6.1.1
"Queues": [
{
"Name": "my_queue1",
Expand Down
41 changes: 41 additions & 0 deletions samples/RabbitMQCoreClient.ConsoleClient/RandomStringGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Text;

namespace RabbitMQCoreClient.ConsoleClient;

public static class RandomStringGenerator
{
static readonly Random _random = new Random();
static readonly char[] _validChars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+-=[]{}|;:,.<>?~ "
.ToCharArray();

/// <summary>
/// Generates a random string of the specified size in UTF-8 (in bytes).
/// </summary>
/// <param name="sizeInBytes">The desired string size in bytes.</param>
/// <returns>A random string that takes up approximately sizeInBytes bytes in UTF-8.</returns>
public static string GenerateRandomString(long sizeInBytes)
{
if (sizeInBytes <= 0)
throw new ArgumentException("The size must be greater than 0.", nameof(sizeInBytes));

var sb = new StringBuilder();
long currentSize = 0;

while (currentSize < sizeInBytes)
{
char c = _validChars[_random.Next(_validChars.Length)];
sb.Append(c);
currentSize += Encoding.UTF8.GetByteCount(new[] { c });
}

// We cut off the excess if we have gone beyond the limits
while (Encoding.UTF8.GetByteCount(sb.ToString()) > sizeInBytes)
{
sb.Length--;
}

return sb.ToString();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Events;
using System;
using System.Net.Security;
using System.Security.Authentication;
Expand Down Expand Up @@ -127,5 +127,10 @@ public class RabbitMQCoreClientOptions
/// Retrieve or set the path to client certificate.
/// </summary>
public string? SslCertPath { get; set; }

/// <summary>
/// The maximum message body size limit. Default is 16MBi.
/// </summary>
public int MaxBodySize { get; set; } = 16 * 1024 * 1024; // 16 МБи
}
}
27 changes: 27 additions & 0 deletions src/RabbitMQCoreClient/Exceptions/BadMessageException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;

namespace RabbitMQCoreClient.Exceptions;

public class BadMessageException : Exception
{
/// <summary>Initializes a new instance of the <see cref="BadMessageException" /> class.</summary>
public BadMessageException()
{

}

/// <summary>Initializes a new instance of the <see cref="BadMessageException" /> class with a specified error message.</summary>
/// <param name="message">The message that describes the error.</param>
public BadMessageException(string? message) : base(message)
{

}

/// <summary>Initializes a new instance of the <see cref="BadMessageException" /> class with a specified error message and a reference to the inner exception that is the cause of this exception.</summary>
/// <param name="message">The error message that explains the reason for the exception.</param>
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference (<see langword="Nothing" /> in Visual Basic) if no inner exception is specified.</param>
public BadMessageException(string? message, Exception? innerException) : base(message, innerException)
{

}
}
80 changes: 73 additions & 7 deletions src/RabbitMQCoreClient/QueueServiceImpl.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
Expand All @@ -9,6 +9,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -320,6 +321,13 @@ public async ValueTask SendAsync(
if (string.IsNullOrEmpty(exchange))
throw new ArgumentException($"{nameof(exchange)} is null or empty.", nameof(exchange));

if (obj.Length > Options.MaxBodySize)
{
string decodedString = DecodeMessageAsString(obj);
throw new BadMessageException($"The message size \"{obj.Length}\" exceeds max body limit of \"{Options.MaxBodySize}\" " +
$"on routing key \"{routingKey}\" (exchange: \"{exchange}\"). Decoded message part: {decodedString}");
}

CheckSendChannelOpened();

await _semaphoreSlim.WaitAsync();
Expand All @@ -333,6 +341,10 @@ public async ValueTask SendAsync(
body: obj);
_log.LogDebug("Sent raw message to exchange {exchange} with routing key {routingKey}.", exchange, routingKey);
}
catch
{
throw;
}
finally
{
_semaphoreSlim.Release();
Expand Down Expand Up @@ -381,16 +393,34 @@ public async ValueTask SendBatchAsync(
{
var batchOperation = CreateBasicJsonBatchProperties();

foreach (var (Body, Props) in objs)
foreach (var (body, props) in objs)
{
AddTtl(Props, decreaseTtl);
AddCorrelationId(Props, correlationId);

batchOperation?.Add(exchange, routingKey, false, Props, Body);
if (body.Length > Options.MaxBodySize)
{
string decodedString = DecodeMessageAsString(body);

_log.LogError("Skipped message due to message size \"{messageSize}\" exceeds max body limit of \"{maxBodySize}\" " +
"on routing key \"{routingKey}\" (exchange: \"{exchange}\". Decoded message part: {DecodedString})",
body.Length,
Options.MaxBodySize,
routingKey,
exchange,
decodedString);
continue;
}

AddTtl(props, decreaseTtl);
AddCorrelationId(props, correlationId);

batchOperation?.Add(exchange, routingKey, false, props, body);
}
batchOperation?.Publish();
_log.LogDebug("Sent raw messages batch to exchange {exchange} with routing key {routingKey}.", exchange, routingKey);
}
catch
{
throw;
}
finally
{
_semaphoreSlim.Release();
Expand Down Expand Up @@ -527,5 +557,41 @@ void CheckSendChannelOpened()
if (_connectionBlocked)
throw new NotConnectedException("Connection is blocked.");
}

/// <summary>
/// Try decode bytes array to string 1024 length or write as hex.
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
static string DecodeMessageAsString(ReadOnlyMemory<byte> obj)
{
int bufferSize = 1024; // We need ~1 KB of text to log.
int decodedStringLength = Math.Min(obj.Length, bufferSize);
ReadOnlySpan<byte> slice = obj.Span.Slice(0, decodedStringLength);

// Find the index of the last complete character
int lastValidIndex = slice.Length - 1;
while (lastValidIndex >= 0 && (slice[lastValidIndex] & 0b11000000) == 0b10000000)
{
// If a byte is a "continuation" of a UTF-8 character (starts with 10xxxxxx),
// it means that it is part of the previous character and needs to be discarded.
lastValidIndex--;
}

// Truncating to the last valid character
slice = slice.Slice(0, lastValidIndex + 1);

// Checking the string is UTF8
var decoder = Encoding.UTF8.GetDecoder();
char[] buffer = new char[decodedStringLength];
decoder.Convert(slice, buffer, flush: true, out _, out int charsUsed, out bool completed);
if (completed)
return new string(buffer, 0, charsUsed);
else
{
// Generating bytes as string.
return BitConverter.ToString(obj.Span.ToArray(), 0, decodedStringLength);
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/RabbitMQCoreClient/RabbitMQCoreClient.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Version>6.1.0</Version>
<Version>6.1.1</Version>
<VersionSuffix>$(VersionSuffix)</VersionSuffix>
<Version Condition=" '$(VersionSuffix)' != '' ">$(Version)-$(VersionSuffix)</Version>
<IsPackable>true</IsPackable>
Expand Down
Loading