diff --git a/README.md b/README.md index 6f3a7d2..80a7771 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# RabbitMQ Client library for .net core applications with Dependency Injection support +# RabbitMQ Client library for .net core applications with Dependency Injection support Library Version: v6 @@ -606,6 +606,7 @@ Set to true to check peer certificate for revocation. "SslCheckCertificateRevocation": false, // Introduced in v5.3.0 "SslCertPassphrase": "pass", // Introduced in v5.3.0 "SslCertPath": "/certs", // Introduced in v5.3.0 + "MaxBodySize": 16777216, // Introduced in v6.1.1 "Queues": [ { "Name": "my_queue1", diff --git a/samples/RabbitMQCoreClient.ConsoleClient/RandomStringGenerator.cs b/samples/RabbitMQCoreClient.ConsoleClient/RandomStringGenerator.cs new file mode 100644 index 0000000..c8a15bd --- /dev/null +++ b/samples/RabbitMQCoreClient.ConsoleClient/RandomStringGenerator.cs @@ -0,0 +1,41 @@ +using System; +using System.Text; + +namespace RabbitMQCoreClient.ConsoleClient; + +public static class RandomStringGenerator +{ + static readonly Random _random = new Random(); + static readonly char[] _validChars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+-=[]{}|;:,.<>?~ " + .ToCharArray(); + + /// + /// Generates a random string of the specified size in UTF-8 (in bytes). + /// + /// The desired string size in bytes. + /// A random string that takes up approximately sizeInBytes bytes in UTF-8. + public static string GenerateRandomString(long sizeInBytes) + { + if (sizeInBytes <= 0) + throw new ArgumentException("The size must be greater than 0.", nameof(sizeInBytes)); + + var sb = new StringBuilder(); + long currentSize = 0; + + while (currentSize < sizeInBytes) + { + char c = _validChars[_random.Next(_validChars.Length)]; + sb.Append(c); + currentSize += Encoding.UTF8.GetByteCount(new[] { c }); + } + + // We cut off the excess if we have gone beyond the limits + while (Encoding.UTF8.GetByteCount(sb.ToString()) > sizeInBytes) + { + sb.Length--; + } + + return sb.ToString(); + } +} diff --git a/src/RabbitMQCoreClient/DependencyInjection/Options/RabbitMQCoreClientOptions.cs b/src/RabbitMQCoreClient/DependencyInjection/Options/RabbitMQCoreClientOptions.cs index 8c2f428..7b25deb 100644 --- a/src/RabbitMQCoreClient/DependencyInjection/Options/RabbitMQCoreClientOptions.cs +++ b/src/RabbitMQCoreClient/DependencyInjection/Options/RabbitMQCoreClientOptions.cs @@ -1,4 +1,4 @@ -using RabbitMQ.Client.Events; +using RabbitMQ.Client.Events; using System; using System.Net.Security; using System.Security.Authentication; @@ -127,5 +127,10 @@ public class RabbitMQCoreClientOptions /// Retrieve or set the path to client certificate. /// public string? SslCertPath { get; set; } + + /// + /// The maximum message body size limit. Default is 16MBi. + /// + public int MaxBodySize { get; set; } = 16 * 1024 * 1024; // 16 МБи } } diff --git a/src/RabbitMQCoreClient/Exceptions/BadMessageException.cs b/src/RabbitMQCoreClient/Exceptions/BadMessageException.cs new file mode 100644 index 0000000..4795ff6 --- /dev/null +++ b/src/RabbitMQCoreClient/Exceptions/BadMessageException.cs @@ -0,0 +1,27 @@ +using System; + +namespace RabbitMQCoreClient.Exceptions; + +public class BadMessageException : Exception +{ + /// Initializes a new instance of the class. + public BadMessageException() + { + + } + + /// Initializes a new instance of the class with a specified error message. + /// The message that describes the error. + public BadMessageException(string? message) : base(message) + { + + } + + /// Initializes a new instance of the class with a specified error message and a reference to the inner exception that is the cause of this exception. + /// The error message that explains the reason for the exception. + /// The exception that is the cause of the current exception, or a null reference ( in Visual Basic) if no inner exception is specified. + public BadMessageException(string? message, Exception? innerException) : base(message, innerException) + { + + } +} diff --git a/src/RabbitMQCoreClient/QueueServiceImpl.cs b/src/RabbitMQCoreClient/QueueServiceImpl.cs index 0ceed76..8db818b 100644 --- a/src/RabbitMQCoreClient/QueueServiceImpl.cs +++ b/src/RabbitMQCoreClient/QueueServiceImpl.cs @@ -1,4 +1,4 @@ -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -9,6 +9,7 @@ using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; +using System.Drawing; using System.Linq; using System.Text; using System.Threading; @@ -320,6 +321,13 @@ public async ValueTask SendAsync( if (string.IsNullOrEmpty(exchange)) throw new ArgumentException($"{nameof(exchange)} is null or empty.", nameof(exchange)); + if (obj.Length > Options.MaxBodySize) + { + string decodedString = DecodeMessageAsString(obj); + throw new BadMessageException($"The message size \"{obj.Length}\" exceeds max body limit of \"{Options.MaxBodySize}\" " + + $"on routing key \"{routingKey}\" (exchange: \"{exchange}\"). Decoded message part: {decodedString}"); + } + CheckSendChannelOpened(); await _semaphoreSlim.WaitAsync(); @@ -333,6 +341,10 @@ public async ValueTask SendAsync( body: obj); _log.LogDebug("Sent raw message to exchange {exchange} with routing key {routingKey}.", exchange, routingKey); } + catch + { + throw; + } finally { _semaphoreSlim.Release(); @@ -381,16 +393,34 @@ public async ValueTask SendBatchAsync( { var batchOperation = CreateBasicJsonBatchProperties(); - foreach (var (Body, Props) in objs) + foreach (var (body, props) in objs) { - AddTtl(Props, decreaseTtl); - AddCorrelationId(Props, correlationId); - - batchOperation?.Add(exchange, routingKey, false, Props, Body); + if (body.Length > Options.MaxBodySize) + { + string decodedString = DecodeMessageAsString(body); + + _log.LogError("Skipped message due to message size \"{messageSize}\" exceeds max body limit of \"{maxBodySize}\" " + + "on routing key \"{routingKey}\" (exchange: \"{exchange}\". Decoded message part: {DecodedString})", + body.Length, + Options.MaxBodySize, + routingKey, + exchange, + decodedString); + continue; + } + + AddTtl(props, decreaseTtl); + AddCorrelationId(props, correlationId); + + batchOperation?.Add(exchange, routingKey, false, props, body); } batchOperation?.Publish(); _log.LogDebug("Sent raw messages batch to exchange {exchange} with routing key {routingKey}.", exchange, routingKey); } + catch + { + throw; + } finally { _semaphoreSlim.Release(); @@ -527,5 +557,41 @@ void CheckSendChannelOpened() if (_connectionBlocked) throw new NotConnectedException("Connection is blocked."); } + + /// + /// Try decode bytes array to string 1024 length or write as hex. + /// + /// + /// + static string DecodeMessageAsString(ReadOnlyMemory obj) + { + int bufferSize = 1024; // We need ~1 KB of text to log. + int decodedStringLength = Math.Min(obj.Length, bufferSize); + ReadOnlySpan slice = obj.Span.Slice(0, decodedStringLength); + + // Find the index of the last complete character + int lastValidIndex = slice.Length - 1; + while (lastValidIndex >= 0 && (slice[lastValidIndex] & 0b11000000) == 0b10000000) + { + // If a byte is a "continuation" of a UTF-8 character (starts with 10xxxxxx), + // it means that it is part of the previous character and needs to be discarded. + lastValidIndex--; + } + + // Truncating to the last valid character + slice = slice.Slice(0, lastValidIndex + 1); + + // Checking the string is UTF8 + var decoder = Encoding.UTF8.GetDecoder(); + char[] buffer = new char[decodedStringLength]; + decoder.Convert(slice, buffer, flush: true, out _, out int charsUsed, out bool completed); + if (completed) + return new string(buffer, 0, charsUsed); + else + { + // Generating bytes as string. + return BitConverter.ToString(obj.Span.ToArray(), 0, decodedStringLength); + } + } } -} \ No newline at end of file +} diff --git a/src/RabbitMQCoreClient/RabbitMQCoreClient.csproj b/src/RabbitMQCoreClient/RabbitMQCoreClient.csproj index eff71ff..9d55267 100644 --- a/src/RabbitMQCoreClient/RabbitMQCoreClient.csproj +++ b/src/RabbitMQCoreClient/RabbitMQCoreClient.csproj @@ -1,7 +1,7 @@ - 6.1.0 + 6.1.1 $(VersionSuffix) $(Version)-$(VersionSuffix) true