From 98f72556455831efafd068065a4d3ae0fef9c333 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Thu, 24 Jul 2025 10:51:15 +1000 Subject: [PATCH 1/2] Fix array pool usage across API buffer and forwarding channel entries --- .../Forwarder/Channel/ForwardingChannel.cs | 9 ++- .../Channel/ForwardingChannelEntry.cs | 1 + .../Forwarder/Web/Api/IngestionEndpoints.cs | 65 +++++++++---------- 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs b/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs index 3c0618e7..4897be06 100644 --- a/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs +++ b/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs @@ -13,6 +13,7 @@ // limitations under the License. using System; +using System.Buffers; using System.IO; using System.Threading; using System.Threading.Channels; @@ -73,6 +74,8 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark { entry.CompletionSource.TrySetException(e); } + + ArrayPool.Shared.Return(entry.Data.Array!); } } catch (Exception ex) @@ -124,12 +127,14 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark }, cancellationToken: hardCancel); } - public async Task WriteAsync(byte[] storage, Range range, CancellationToken cancellationToken) + public async Task WriteAsync(ArraySegment data, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(); using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _hardCancel); - await _writer.WriteAsync(new ForwardingChannelEntry(storage[range], tcs), cts.Token); + var copyBuffer = ArrayPool.Shared.Rent(data.Count); + data.AsSpan().CopyTo(copyBuffer.AsSpan()); + await _writer.WriteAsync(new ForwardingChannelEntry(new ArraySegment(copyBuffer, 0, data.Count), tcs), cts.Token); await tcs.Task; } diff --git a/src/SeqCli/Forwarder/Channel/ForwardingChannelEntry.cs b/src/SeqCli/Forwarder/Channel/ForwardingChannelEntry.cs index bf693909..2adafdf3 100644 --- a/src/SeqCli/Forwarder/Channel/ForwardingChannelEntry.cs +++ b/src/SeqCli/Forwarder/Channel/ForwardingChannelEntry.cs @@ -17,4 +17,5 @@ namespace SeqCli.Forwarder.Channel; +// Note, Data is backed by a rented array that the receiver should return. readonly record struct ForwardingChannelEntry(ArraySegment Data, TaskCompletionSource CompletionSource); diff --git a/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs b/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs index 1314a7e7..5b7e0cda 100644 --- a/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs +++ b/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs @@ -70,21 +70,23 @@ async Task IngestAsync(HttpContext context) async Task IngestCompactFormatAsync(HttpContext context) { + byte[]? rented = null; + try { - var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted); cts.CancelAfter(TimeSpan.FromSeconds(5)); - + var requestApiKey = GetApiKey(context.Request); - var log = _forwardingChannels.GetForwardingChannel(requestApiKey); - + var log = _forwardingChannels.GetForwardingChannel(requestApiKey); + // Add one for the extra newline that we have to insert at the end of batches. var bufferSize = _config.Connection.BatchSizeLimitBytes + 1; - var rented = ArrayPool.Shared.Rent(bufferSize); - var buffer = rented[..bufferSize]; + rented = ArrayPool.Shared.Rent(bufferSize); + var buffer = new ArraySegment(rented, 0, bufferSize); var writeHead = 0; var readHead = 0; - + var done = false; while (!done) { @@ -92,30 +94,30 @@ async Task IngestCompactFormatAsync(HttpContext context) // size of write batches. while (!done) { - var remaining = buffer.Length - 1 - writeHead; + var remaining = buffer.Count - 1 - writeHead; if (remaining == 0) { IngestionLog.ForClient(context.Connection.RemoteIpAddress) .Error("An incoming request exceeded the configured batch size limit"); return Error(HttpStatusCode.RequestEntityTooLarge, "the request is too large to process"); } - + var read = await context.Request.Body.ReadAsync(buffer.AsMemory(writeHead, remaining), cts.Token); if (read == 0) { done = true; } - + writeHead += read; - + // Ingested batches must be terminated with `\n`, but this isn't an API requirement. - if (done && writeHead > 0 && writeHead < buffer.Length && buffer[writeHead - 1] != (byte)'\n') + if (done && writeHead > 0 && writeHead < buffer.Count && buffer[writeHead - 1] != (byte)'\n') { buffer[writeHead] = (byte)'\n'; writeHead += 1; } } - + // Validate what we read, marking out a batch of one or more complete newline-delimited events. var batchStart = readHead; var batchEnd = readHead; @@ -123,14 +125,14 @@ async Task IngestCompactFormatAsync(HttpContext context) { var eventStart = batchEnd; var nlIndex = buffer.AsSpan()[eventStart..].IndexOf((byte)'\n'); - + if (nlIndex == -1) { break; } - + var eventEnd = eventStart + nlIndex + 1; - + batchEnd = eventEnd; readHead = batchEnd; @@ -142,12 +144,12 @@ async Task IngestCompactFormatAsync(HttpContext context) return Error(HttpStatusCode.BadRequest, $"Payload validation failed: {error}."); } } - + if (batchStart != batchEnd) { - await Write(log, ArrayPool.Shared, buffer, batchStart..batchEnd, cts.Token); + await log.WriteAsync(buffer[batchStart..batchEnd], cts.Token); } - + // Copy any unprocessed data into our buffer and continue if (!done && readHead != 0) { @@ -157,10 +159,7 @@ async Task IngestCompactFormatAsync(HttpContext context) writeHead = retain; } } - - // Exception cases are handled by `Write` - ArrayPool.Shared.Return(rented); - + return SuccessfulIngestion(); } catch (Exception ex) @@ -169,6 +168,13 @@ async Task IngestCompactFormatAsync(HttpContext context) .Error(ex, "Ingestion failed"); return Error(HttpStatusCode.InternalServerError, "Ingestion failed."); } + finally + { + if (rented != null) + { + ArrayPool.Shared.Return(rented); + } + } } static bool DefaultedBoolQuery(HttpRequest request, string queryParameterName) @@ -263,19 +269,6 @@ bool ValidateClef(Span evt, [NotNullWhen(false)] out string? errorFragment errorFragment = null; return true; } - - static async Task Write(ForwardingChannel forwardingChannel, ArrayPool pool, byte[] storage, Range range, CancellationToken cancellationToken) - { - try - { - await forwardingChannel.WriteAsync(storage, range, cancellationToken); - } - catch - { - pool.Return(storage); - throw; - } - } static IResult Error(HttpStatusCode statusCode, string message) { From f84d3f17aa4ad440be85569a3484f2e789b043d2 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Thu, 24 Jul 2025 11:03:50 +1000 Subject: [PATCH 2/2] Give process invocations a little longer to complete on slow CI runners --- test/SeqCli.EndToEnd/Support/CaptiveProcess.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/SeqCli.EndToEnd/Support/CaptiveProcess.cs b/test/SeqCli.EndToEnd/Support/CaptiveProcess.cs index e805cb49..fb11e841 100644 --- a/test/SeqCli.EndToEnd/Support/CaptiveProcess.cs +++ b/test/SeqCli.EndToEnd/Support/CaptiveProcess.cs @@ -111,11 +111,11 @@ public int WaitForExit(TimeSpan? timeout = null) if (_captureOutput) { - if (!_outputComplete.WaitOne(TimeSpan.FromSeconds(1))) - throw new IOException("STDOUT did not complete in the fixed 1 second window."); + if (!_outputComplete.WaitOne(TimeSpan.FromSeconds(5))) + throw new IOException("STDOUT did not complete in the fixed 5-second window."); - if (!_errorComplete.WaitOne(TimeSpan.FromSeconds(1))) - throw new IOException("STDERR did not complete in the fixed 1 second window."); + if (!_errorComplete.WaitOne(TimeSpan.FromSeconds(5))) + throw new IOException("STDERR did not complete in the fixed 5-second window."); } return _process.ExitCode;