diff --git a/src/SeqCli/Cli/Commands/Forwarder/TruncateCommand.cs b/src/SeqCli/Cli/Commands/Forwarder/TruncateCommand.cs index c73b37d6..844eb9ee 100644 --- a/src/SeqCli/Cli/Commands/Forwarder/TruncateCommand.cs +++ b/src/SeqCli/Cli/Commands/Forwarder/TruncateCommand.cs @@ -15,7 +15,6 @@ using System; using System.Threading.Tasks; using SeqCli.Cli.Features; -using SeqCli.Forwarder.Multiplexing; using Serilog; namespace SeqCli.Cli.Commands.Forwarder; @@ -39,7 +38,6 @@ protected override async Task Run(string[] args) if (!_confirm.TryConfirm("All data in the forwarder's log buffer will be deleted. This cannot be undone.")) return 1; - ActiveLogBufferMap.Truncate(_storagePath.BufferPath); return 0; } catch (Exception ex) diff --git a/src/SeqCli/Forwarder/ForwarderModule.cs b/src/SeqCli/Forwarder/ForwarderModule.cs index 1bccec25..9f75f2dc 100644 --- a/src/SeqCli/Forwarder/ForwarderModule.cs +++ b/src/SeqCli/Forwarder/ForwarderModule.cs @@ -17,7 +17,7 @@ using System.Threading; using Autofac; using SeqCli.Config; -using SeqCli.Forwarder.Multiplexing; +using SeqCli.Forwarder.Storage; using SeqCli.Forwarder.Web.Api; using SeqCli.Forwarder.Web.Host; using Serilog.Formatting.Display; @@ -38,15 +38,10 @@ public ForwarderModule(string bufferPath, SeqCliConfig config) protected override void Load(ContainerBuilder builder) { builder.RegisterType().SingleInstance(); - builder.RegisterType() - .WithParameter("bufferPath", _bufferPath) - .SingleInstance(); + builder.RegisterType().SingleInstance(); - builder.RegisterType().As(); - builder.RegisterType().SingleInstance(); builder.RegisterType().As(); builder.RegisterType().As(); - builder.Register(c => _config.Connection); builder.RegisterInstance(new MessageTemplateTextFormatter( "[{Timestamp:o} {Level:u3}] {Message}{NewLine}" + (_config.Forwarder.Diagnostics.IngestionLogShowDetail ? "" @@ -79,7 +74,6 @@ protected override void Load(ContainerBuilder builder) } return new HttpClient { BaseAddress = new Uri(baseUri) }; - }).SingleInstance(); builder.RegisterInstance(_config); diff --git a/src/SeqCli/Forwarder/Multiplexing/ActiveLogBuffer.cs b/src/SeqCli/Forwarder/Multiplexing/ActiveLogBuffer.cs deleted file mode 100644 index 52fd743b..00000000 --- a/src/SeqCli/Forwarder/Multiplexing/ActiveLogBuffer.cs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using SeqCli.Forwarder.Shipper; -using SeqCli.Forwarder.Storage; - -namespace SeqCli.Forwarder.Multiplexing; - -sealed class ActiveLogBuffer : IDisposable -{ - public LogShipper Shipper { get; } - public LogBuffer Buffer { get; } - - public ActiveLogBuffer(LogBuffer logBuffer, LogShipper logShipper) - { - Buffer = logBuffer ?? throw new ArgumentNullException(nameof(logBuffer)); - Shipper = logShipper ?? throw new ArgumentNullException(nameof(logShipper)); - } - - public void Dispose() - { - Shipper.Dispose(); - Buffer.Dispose(); - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Multiplexing/ActiveLogBufferMap.cs b/src/SeqCli/Forwarder/Multiplexing/ActiveLogBufferMap.cs deleted file mode 100644 index 92f96f2b..00000000 --- a/src/SeqCli/Forwarder/Multiplexing/ActiveLogBufferMap.cs +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using System.Collections.Generic; -using System.IO; -using System.Net; -using System.Text; -using SeqCli.Config; -using SeqCli.Encryptor; -using SeqCli.Forwarder.Storage; -using SeqCli.Forwarder.Web; -using Serilog; - -namespace SeqCli.Forwarder.Multiplexing; - -class ActiveLogBufferMap : IDisposable -{ - const string DataFileName = "data.mdb", LockFileName = "lock.mdb", ApiKeyFileName = ".apikey"; - - static readonly Encoding ApiKeyEncoding = new UTF8Encoding(false); - - readonly ulong _bufferSizeBytes; - readonly ConnectionConfig _connectionConfig; - readonly ILogShipperFactory _shipperFactory; - readonly IDataProtector _dataProtector; - readonly string _bufferPath; - readonly ILogger _log = Log.ForContext(); - - readonly object _sync = new(); - bool _loaded; - ActiveLogBuffer? _noApiKeyLogBuffer; - readonly Dictionary _buffersByApiKey = new(); - - public ActiveLogBufferMap( - string bufferPath, - SeqCliConfig config, - ILogShipperFactory logShipperFactory) - { - ArgumentNullException.ThrowIfNull(config, nameof(config)); - _bufferSizeBytes = config.Forwarder.Storage.BufferSizeBytes; - _connectionConfig = config.Connection; - _shipperFactory = logShipperFactory ?? throw new ArgumentNullException(nameof(logShipperFactory)); - _dataProtector = config.Encryption.DataProtector(); - _bufferPath = bufferPath ?? throw new ArgumentNullException(nameof(bufferPath)); - } - - // The odd three-stage initialization improves our chances of correctly tearing down the `LightningEnvironment`s within - // `LogBuffer`s in the event of a failure during start-up. See: https://github.com/CoreyKaylor/Lightning.NET/blob/master/src/LightningDB/LightningEnvironment.cs#L252 - public void Load() - { - // At startup, we look for buffers and either delete them if they're empty, or load them - // up if they're not. This garbage collection at start-up is a simplification, - // we might try cleaning up in the background if the gains are worthwhile, although more synchronization - // would be required. - - lock (_sync) - { - if (_loaded) throw new InvalidOperationException("The log buffer map is already loaded."); - - Directory.CreateDirectory(_bufferPath); - - var defaultDataFilePath = Path.Combine(_bufferPath, DataFileName); - if (File.Exists(defaultDataFilePath)) - { - _log.Information("Loading the default log buffer in {Path}", _bufferPath); - var buffer = new LogBuffer(_bufferPath, _bufferSizeBytes); - if (buffer.Peek(0).Length == 0) - { - _log.Information("The default buffer is empty and will be removed until more data is received"); - buffer.Dispose(); - File.Delete(defaultDataFilePath); - var lockFilePath = Path.Combine(_bufferPath, LockFileName); - if (File.Exists(lockFilePath)) - File.Delete(lockFilePath); - } - else - { - _noApiKeyLogBuffer = new ActiveLogBuffer(buffer, _shipperFactory.Create(buffer, _connectionConfig.DecodeApiKey(_dataProtector))); - } - } - - foreach (var subfolder in Directory.GetDirectories(_bufferPath)) - { - var encodedApiKeyFilePath = Path.Combine(subfolder, ApiKeyFileName); - if (!File.Exists(encodedApiKeyFilePath)) - { - _log.Information("Folder {Path} does not appear to be a log buffer; skipping", subfolder); - continue; - } - - _log.Information("Loading an API-key specific buffer in {Path}", subfolder); - var apiKey = ApiKeyEncoding.GetString(_dataProtector.Decrypt(File.ReadAllBytes(encodedApiKeyFilePath))); - - var buffer = new LogBuffer(subfolder, _bufferSizeBytes); - if (buffer.Peek(0).Length == 0) - { - _log.Information("API key-specific buffer in {Path} is empty and will be removed until more data is received", subfolder); - buffer.Dispose(); - Directory.Delete(subfolder, true); - } - else - { - var activeBuffer = new ActiveLogBuffer(buffer, _shipperFactory.Create(buffer, apiKey)); - _buffersByApiKey.Add(apiKey, activeBuffer); - } - } - - _loaded = true; - } - } - - public void Start() - { - lock (_sync) - { - if (!_loaded) throw new InvalidOperationException("The log buffer map is not loaded."); - - foreach (var buffer in OpenBuffers) - { - buffer.Shipper.Start(); - } - } - } - - public void Stop() - { - lock (_sync) - { - // Hard to ensure _loaded is set in all cases, better here to be forgiving and - // permit a clean shut-down. - - foreach (var buffer in OpenBuffers) - { - buffer.Shipper.Stop(); - } - } - } - - public LogBuffer GetLogBuffer(string? apiKey) - { - lock (_sync) - { - if (!_loaded) throw new RequestProcessingException("The forwarder service is starting up.", HttpStatusCode.ServiceUnavailable); - - if (apiKey == null) - { - if (_noApiKeyLogBuffer == null) - { - _log.Information("Creating a new default log buffer in {Path}", _bufferPath); - var buffer = new LogBuffer(_bufferPath, _bufferSizeBytes); - _noApiKeyLogBuffer = new ActiveLogBuffer(buffer, _shipperFactory.Create(buffer, _connectionConfig.DecodeApiKey(_dataProtector))); - _noApiKeyLogBuffer.Shipper.Start(); - } - return _noApiKeyLogBuffer.Buffer; - } - - if (_buffersByApiKey.TryGetValue(apiKey, out var existing)) - return existing.Buffer; - - var subfolder = Path.Combine(_bufferPath, Guid.NewGuid().ToString("n")); - _log.Information("Creating a new API key-specific log buffer in {Path}", subfolder); - Directory.CreateDirectory(subfolder); - File.WriteAllBytes(Path.Combine(subfolder, ".apikey"), _dataProtector.Encrypt(ApiKeyEncoding.GetBytes(apiKey))); - var newBuffer = new LogBuffer(subfolder, _bufferSizeBytes); - var newActiveBuffer = new ActiveLogBuffer(newBuffer, _shipperFactory.Create(newBuffer, apiKey)); - _buffersByApiKey.Add(apiKey, newActiveBuffer); - newActiveBuffer.Shipper.Start(); - return newBuffer; - } - } - - public void Dispose() - { - lock (_sync) - { - foreach (var buffer in OpenBuffers) - { - buffer.Dispose(); - } - } - } - - public static void Truncate(string bufferPath) - { - DeleteIfExists(Path.Combine(bufferPath, DataFileName)); - DeleteIfExists(Path.Combine(bufferPath, LockFileName)); - foreach (var subdirectory in Directory.GetDirectories(bufferPath)) - { - if (File.Exists(Path.Combine(subdirectory, ApiKeyFileName))) - Directory.Delete(subdirectory, true); - } - } - - static void DeleteIfExists(string filePath) - { - if (File.Exists(filePath)) - File.Delete(filePath); - } - - IEnumerable OpenBuffers - { - get - { - if (_noApiKeyLogBuffer != null) - yield return _noApiKeyLogBuffer; - - foreach (var buffer in _buffersByApiKey.Values) - yield return buffer; - } - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Multiplexing/HttpLogShipperFactory.cs b/src/SeqCli/Forwarder/Multiplexing/HttpLogShipperFactory.cs deleted file mode 100644 index 3101421a..00000000 --- a/src/SeqCli/Forwarder/Multiplexing/HttpLogShipperFactory.cs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using System.Net.Http; -using SeqCli.Config; -using SeqCli.Forwarder.Shipper; -using SeqCli.Forwarder.Storage; - -namespace SeqCli.Forwarder.Multiplexing; - -class HttpLogShipperFactory : ILogShipperFactory -{ - readonly HttpClient _outputHttpClient; - readonly ServerResponseProxy _serverResponseProxy; - readonly ConnectionConfig _outputConfig; - - public HttpLogShipperFactory(SeqCliConfig config, ServerResponseProxy serverResponseProxy, HttpClient outputHttpClient) - { - ArgumentNullException.ThrowIfNull(config, nameof(config)); - - _outputHttpClient = outputHttpClient; - _serverResponseProxy = serverResponseProxy ?? throw new ArgumentNullException(nameof(serverResponseProxy)); - _outputConfig = config.Connection; - } - - public LogShipper Create(LogBuffer logBuffer, string? apiKey) - { - return new HttpLogShipper(logBuffer, apiKey, _outputConfig, _serverResponseProxy, _outputHttpClient); - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Multiplexing/ILogShipperFactory.cs b/src/SeqCli/Forwarder/Multiplexing/ILogShipperFactory.cs deleted file mode 100644 index 773f455f..00000000 --- a/src/SeqCli/Forwarder/Multiplexing/ILogShipperFactory.cs +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using SeqCli.Forwarder.Shipper; -using SeqCli.Forwarder.Storage; - -namespace SeqCli.Forwarder.Multiplexing; - -public interface ILogShipperFactory -{ - LogShipper Create(LogBuffer logBuffer, string? apiKey); -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Multiplexing/InertLogShipperFactory.cs b/src/SeqCli/Forwarder/Multiplexing/InertLogShipperFactory.cs deleted file mode 100644 index b6fff878..00000000 --- a/src/SeqCli/Forwarder/Multiplexing/InertLogShipperFactory.cs +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using SeqCli.Forwarder.Shipper; -using SeqCli.Forwarder.Storage; - -namespace SeqCli.Forwarder.Multiplexing; - -class InertLogShipperFactory : ILogShipperFactory -{ - public LogShipper Create(LogBuffer logBuffer, string? apiKey) - { - return new InertLogShipper(); - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Multiplexing/ServerResponseProxy.cs b/src/SeqCli/Forwarder/Multiplexing/ServerResponseProxy.cs deleted file mode 100644 index b52dc988..00000000 --- a/src/SeqCli/Forwarder/Multiplexing/ServerResponseProxy.cs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System.Collections.Generic; - -namespace SeqCli.Forwarder.Multiplexing; - -public class ServerResponseProxy -{ - const string EmptyResponse = "{}"; - - readonly object _syncRoot = new(); - readonly Dictionary _lastResponseByApiKey = new(); - string _lastNoApiKeyResponse = EmptyResponse; - - public void SuccessResponseReturned(string? apiKey, string response) - { - lock (_syncRoot) - { - if (apiKey == null) - _lastNoApiKeyResponse = response; - else - _lastResponseByApiKey[apiKey] = response; - } - } - - public string GetResponseText(string? apiKey) - { - lock (_syncRoot) - { - if (apiKey == null) - return _lastNoApiKeyResponse; - - if (_lastResponseByApiKey.TryGetValue(apiKey, out var response)) - return response; - - return EmptyResponse; - } - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Schema/EventSchema.cs b/src/SeqCli/Forwarder/Schema/EventSchema.cs deleted file mode 100644 index c13f0325..00000000 --- a/src/SeqCli/Forwarder/Schema/EventSchema.cs +++ /dev/null @@ -1,183 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Globalization; -using System.Linq; -using Newtonsoft.Json.Linq; -using SeqCli.Forwarder.Util; -using Serilog.Parsing; - -namespace SeqCli.Forwarder.Schema; - -static class EventSchema -{ - static readonly MessageTemplateParser MessageTemplateParser = new(); - - static readonly HashSet ClefReifiedProperties = ["@t", "@m", "@mt", "@l", "@x", "@i", "@r"]; - - public static bool FromClefFormat(in int lineNumber, JObject compactFormat, [MaybeNullWhen(false)] out JObject rawFormat, [MaybeNullWhen(true)] out string error) - { - var result = new JObject(); - - var rawTimestamp = compactFormat["@t"]; - if (rawTimestamp == null) - { - error = $"The event on line {lineNumber} does not carry an `@t` timestamp property."; - rawFormat = default; - return false; - } - - if (rawTimestamp.Type != JTokenType.String) - { - error = $"The event on line {lineNumber} has an invalid `@t` timestamp property; the value must be a JSON string."; - rawFormat = default; - return false; - } - - if (!DateTimeOffset.TryParse(rawTimestamp.Value(), out _)) - { - error = $"The timestamp value `{rawTimestamp}` on line {lineNumber} could not be parsed."; - rawFormat = default; - return false; - } - - result.Add("Timestamp", rawTimestamp); - - var properties = new JObject(); - foreach (var property in compactFormat.Properties()) - { - if (property.Name.StartsWith("@@")) - properties.Add(property.Name.Substring(1), property.Value); - else if (!ClefReifiedProperties.Contains(property.Name)) - properties.Add(property.Name, property.Value); - } - - var x = compactFormat["@x"]; - if (x != null) - { - if (x.Type != JTokenType.String) - { - error = $"The event on line {lineNumber} has a non-string `@x` exception property."; - rawFormat = default; - return false; - } - - result.Add("Exception", x); - } - - var l = compactFormat["@l"]; - if (l != null) - { - if (l.Type != JTokenType.String) - { - error = $"The event on line {lineNumber} has a non-string `@l` level property."; - rawFormat = default; - return false; - } - - result.Add("Level", l); - } - - string? message = null; - var m = compactFormat["@m"]; - if (m != null) - { - if (m.Type != JTokenType.String) - { - error = $"The event on line {lineNumber} has a non-string `@m` message property."; - rawFormat = default; - return false; - } - - message = m.Value(); - } - - string? messageTemplate = null; - var mt = compactFormat["@mt"]; - if (mt != null) - { - if (mt.Type != JTokenType.String) - { - error = $"The event on line {lineNumber} has a non-string `@mt` message template property."; - rawFormat = default; - return false; - } - - messageTemplate = mt.Value(); - } - - if (message != null) - { - result.Add("RenderedMessage", message); - } - else if (messageTemplate != null && compactFormat["@r"] is JArray renderingsArray) - { - var template = MessageTemplateParser.Parse(messageTemplate); - var withFormat = template.Tokens.OfType().Where(pt => pt.Format != null); - - // ReSharper disable once PossibleMultipleEnumeration - if (withFormat.Count() == renderingsArray.Count) - { - // ReSharper disable once PossibleMultipleEnumeration - var renderingsByProperty = withFormat - .Zip(renderingsArray, (p, j) => new { p.PropertyName, Format = p.Format!, Rendering = j.Value() }) - .GroupBy(p => p.PropertyName) - .ToDictionary(g => g.Key, g => g.ToDictionaryDistinct(p => p.Format, p => p.Rendering)); - - var renderings = new JObject(); - result.Add("Renderings", renderings); - - foreach (var (property, propertyRenderings) in renderingsByProperty) - { - var byFormat = new JArray(); - renderings.Add(property, byFormat); - - foreach (var (format, rendering) in propertyRenderings) - { - var element = new JObject {{"Format", format}, {"Rendering", rendering}}; - byFormat.Add(element); - } - } - } - } - - messageTemplate ??= message ?? "No template provided"; - result.Add("MessageTemplate", messageTemplate); - - var eventTypeToken = compactFormat["@i"]; - if (eventTypeToken != null) - { - if (eventTypeToken.Type == JTokenType.Integer) - { - result.Add("EventType", uint.Parse(eventTypeToken.Value()!)); - } - else if (eventTypeToken.Type == JTokenType.String) - { - if (uint.TryParse(eventTypeToken.Value(), NumberStyles.HexNumber, - CultureInfo.InvariantCulture, out var eventType)) - { - result.Add("EventType", eventType); - } - else - { - // Seq would calculate a hash value from the string, here. Forwarder will ignore that - // case and preserve the value in an `@i` property for now. - result.Add("@i", eventTypeToken); - } - } - else - { - error = $"The `@i` event type value on line {lineNumber} is not in a string or numeric format."; - rawFormat = default; - return false; - } - } - - if (properties.Count != 0) - result.Add("Properties", properties); - - rawFormat = result; - error = null; - return true; - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Shipper/ExponentialBackoffConnectionSchedule.cs b/src/SeqCli/Forwarder/Shipper/ExponentialBackoffConnectionSchedule.cs deleted file mode 100644 index 9439f4ba..00000000 --- a/src/SeqCli/Forwarder/Shipper/ExponentialBackoffConnectionSchedule.cs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright © Datalust Pty Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; - -namespace SeqCli.Forwarder.Shipper; - -class ExponentialBackoffConnectionSchedule -{ - static readonly TimeSpan MinimumBackoffPeriod = TimeSpan.FromSeconds(5); - static readonly TimeSpan MaximumBackoffInterval = TimeSpan.FromMinutes(10); - - readonly TimeSpan _period; - - int _failuresSinceSuccessfulConnection; - - public ExponentialBackoffConnectionSchedule(TimeSpan period) - { - if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(period), "The connection retry period must be a positive timespan."); - - _period = period; - } - - public void MarkSuccess() - { - _failuresSinceSuccessfulConnection = 0; - } - - public void MarkFailure() - { - ++_failuresSinceSuccessfulConnection; - } - - public bool LastConnectionFailed => _failuresSinceSuccessfulConnection != 0; - - public TimeSpan NextInterval - { - get - { - // Available, and first failure, just try the batch interval - if (_failuresSinceSuccessfulConnection <= 1) return _period; - - // Second failure, start ramping up the interval - first 2x, then 4x, ... - var backoffFactor = Math.Pow(2, (_failuresSinceSuccessfulConnection - 1)); - - // If the period is ridiculously short, give it a boost so we get some - // visible backoff. - var backoffPeriod = Math.Max(_period.Ticks, MinimumBackoffPeriod.Ticks); - - // The "ideal" interval - var backedOff = (long)(backoffPeriod * backoffFactor); - - // Capped to the maximum interval - var cappedBackoff = Math.Min(MaximumBackoffInterval.Ticks, backedOff); - - // Unless that's shorter than the base interval, in which case we'll just apply the period - var actual = Math.Max(_period.Ticks, cappedBackoff); - - return TimeSpan.FromTicks(actual); - } - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Shipper/HttpLogShipper.cs b/src/SeqCli/Forwarder/Shipper/HttpLogShipper.cs deleted file mode 100644 index f39159b1..00000000 --- a/src/SeqCli/Forwarder/Shipper/HttpLogShipper.cs +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright © Datalust Pty Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using System.IO; -using System.Net; -using System.Net.Http; -using System.Net.Http.Headers; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using SeqCli.Config; -using SeqCli.Forwarder.Multiplexing; -using SeqCli.Forwarder.Storage; -using SeqCli.Forwarder.Util; -using Serilog; - -namespace SeqCli.Forwarder.Shipper; - -sealed class HttpLogShipper : LogShipper -{ - const string BulkUploadResource = "api/events/raw"; - - readonly string? _apiKey; - readonly LogBuffer _logBuffer; - readonly ConnectionConfig _outputConfig; - readonly HttpClient _httpClient; - readonly ExponentialBackoffConnectionSchedule _connectionSchedule; - readonly ServerResponseProxy _serverResponseProxy; - DateTime _nextRequiredLevelCheck; - - readonly object _stateLock = new(); - readonly Timer _timer; - bool _started; - - volatile bool _unloading; - - static readonly TimeSpan QuietWaitPeriod = TimeSpan.FromSeconds(2), MaximumConnectionInterval = TimeSpan.FromMinutes(2); - - public HttpLogShipper(LogBuffer logBuffer, string? apiKey, ConnectionConfig outputConfig, ServerResponseProxy serverResponseProxy, HttpClient outputHttpClient) - { - _apiKey = apiKey; - _httpClient = outputHttpClient ?? throw new ArgumentNullException(nameof(outputHttpClient)); - _logBuffer = logBuffer ?? throw new ArgumentNullException(nameof(logBuffer)); - _outputConfig = outputConfig ?? throw new ArgumentNullException(nameof(outputConfig)); - _serverResponseProxy = serverResponseProxy ?? throw new ArgumentNullException(nameof(serverResponseProxy)); - _connectionSchedule = new ExponentialBackoffConnectionSchedule(QuietWaitPeriod); - _timer = new Timer(_ => OnTick()); - } - - public override void Start() - { - lock (_stateLock) - { - if (_started) - throw new InvalidOperationException("The shipper has already started."); - - if (_unloading) - throw new InvalidOperationException("The shipper is unloading."); - - Log.Information("Log shipper started, events will be dispatched to {ServerUrl}", _outputConfig.ServerUrl); - - _nextRequiredLevelCheck = DateTime.UtcNow.Add(MaximumConnectionInterval); - _started = true; - SetTimer(); - } - } - - public override void Stop() - { - lock (_stateLock) - { - if (_unloading) - return; - - _unloading = true; - - if (!_started) - return; - } - - var wh = new ManualResetEvent(false); - if (_timer.Dispose(wh)) - wh.WaitOne(); - } - - public override void Dispose() - { - Stop(); - } - - void SetTimer() - { - _timer.Change(_connectionSchedule.NextInterval, Timeout.InfiniteTimeSpan); - } - - void OnTick() - { - OnTickAsync().Wait(); - } - - async Task OnTickAsync() - { - try - { - var sendingSingles = 0; - do - { - var available = _logBuffer.Peek((int)_outputConfig.PayloadLimitBytes); - if (available.Length == 0) - { - if (DateTime.UtcNow < _nextRequiredLevelCheck || _connectionSchedule.LastConnectionFailed) - { - // For whatever reason, there's nothing waiting to send. This means we should try connecting again at the - // regular interval, so mark the attempt as successful. - _connectionSchedule.MarkSuccess(); - break; - } - } - - MakePayload(available, sendingSingles > 0, out Stream payload, out ulong lastIncluded); - - var content = new StreamContent(new UnclosableStreamWrapper(payload)); - content.Headers.ContentType = new MediaTypeHeaderValue("application/json") - { - CharSet = Encoding.UTF8.WebName - }; - - if (_apiKey != null) - { - content.Headers.Add(SeqApi.ApiKeyHeaderName, _apiKey); - } - - var result = await _httpClient.PostAsync(BulkUploadResource, content); - if (result.IsSuccessStatusCode) - { - _connectionSchedule.MarkSuccess(); - _logBuffer.Dequeue(lastIncluded); - if (sendingSingles > 0) - sendingSingles--; - - _serverResponseProxy.SuccessResponseReturned(_apiKey, await result.Content.ReadAsStringAsync()); - _nextRequiredLevelCheck = DateTime.UtcNow.Add(MaximumConnectionInterval); - } - else if (result.StatusCode == HttpStatusCode.BadRequest || - result.StatusCode == HttpStatusCode.RequestEntityTooLarge) - { - // The connection attempt was successful - the payload we sent was the problem. - _connectionSchedule.MarkSuccess(); - - if (sendingSingles != 0) - { - payload.Position = 0; - var payloadText = await new StreamReader(payload, Encoding.UTF8).ReadToEndAsync(); - Log.Error("HTTP shipping failed with {StatusCode}: {Result}; payload was {InvalidPayload}", result.StatusCode, await result.Content.ReadAsStringAsync(), payloadText); - _logBuffer.Dequeue(lastIncluded); - sendingSingles = 0; - } - else - { - // Unscientific (should "binary search" in batches) but sending the next - // hundred events singly should flush out the problematic one. - sendingSingles = 100; - } - } - else - { - _connectionSchedule.MarkFailure(); - Log.Error("Received failed HTTP shipping result {StatusCode}: {Result}", result.StatusCode, await result.Content.ReadAsStringAsync()); - break; - } - } - while (true); - } - catch (HttpRequestException hex) - { - Log.Warning(hex, "HTTP request failed when sending a batch from the log shipper"); - _connectionSchedule.MarkFailure(); - } - catch (Exception ex) - { - Log.Error(ex, "Exception while sending a batch from the log shipper"); - _connectionSchedule.MarkFailure(); - } - finally - { - lock (_stateLock) - { - if (!_unloading) - SetTimer(); - } - } - } - - void MakePayload(LogBufferEntry[] entries, bool oneOnly, out Stream utf8Payload, out ulong lastIncluded) - { - if (entries == null) throw new ArgumentNullException(nameof(entries)); - lastIncluded = 0; - - var raw = new MemoryStream(); - var content = new StreamWriter(raw, Encoding.UTF8); - content.Write("{\"Events\":["); - content.Flush(); - var contentRemainingBytes = (int) _outputConfig.PayloadLimitBytes - 13; // Includes closing delimiters - - var delimStart = ""; - foreach (var logBufferEntry in entries) - { - if ((ulong)logBufferEntry.Value.Length > _outputConfig.EventBodyLimitBytes) - { - Log.Information("Oversize event will be skipped, {Payload}", Encoding.UTF8.GetString(logBufferEntry.Value)); - lastIncluded = logBufferEntry.Key; - continue; - } - - // lastIncluded indicates we've added at least one event - if (lastIncluded != 0 && contentRemainingBytes - (delimStart.Length + logBufferEntry.Value.Length) < 0) - break; - - content.Write(delimStart); - content.Flush(); - contentRemainingBytes -= delimStart.Length; - - raw.Write(logBufferEntry.Value, 0, logBufferEntry.Value.Length); - contentRemainingBytes -= logBufferEntry.Value.Length; - - lastIncluded = logBufferEntry.Key; - - delimStart = ","; - if (oneOnly) - break; - } - - content.Write("]}"); - content.Flush(); - raw.Position = 0; - utf8Payload = raw; - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Shipper/InertLogShipper.cs b/src/SeqCli/Forwarder/Shipper/InertLogShipper.cs deleted file mode 100644 index 1ae106e3..00000000 --- a/src/SeqCli/Forwarder/Shipper/InertLogShipper.cs +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace SeqCli.Forwarder.Shipper; - -class InertLogShipper : LogShipper -{ - public override void Start() - { - } - - public override void Stop() - { - } - - public override void Dispose() - { - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Shipper/LogShipper.cs b/src/SeqCli/Forwarder/Shipper/LogShipper.cs deleted file mode 100644 index 83e8beb3..00000000 --- a/src/SeqCli/Forwarder/Shipper/LogShipper.cs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; - -namespace SeqCli.Forwarder.Shipper; - -public abstract class LogShipper : IDisposable -{ - public abstract void Start(); - public abstract void Stop(); - public abstract void Dispose(); -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Shipper/SeqApi.cs b/src/SeqCli/Forwarder/Shipper/SeqApi.cs deleted file mode 100644 index 5e7c45e2..00000000 --- a/src/SeqCli/Forwarder/Shipper/SeqApi.cs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright © Datalust Pty Ltd and Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace SeqCli.Forwarder.Shipper; - -static class SeqApi -{ - public const string ApiKeyHeaderName = "X-Seq-ApiKey"; -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Storage/LogBuffer.cs b/src/SeqCli/Forwarder/Storage/LogBuffer.cs index f58fe8db..b53236cd 100644 --- a/src/SeqCli/Forwarder/Storage/LogBuffer.cs +++ b/src/SeqCli/Forwarder/Storage/LogBuffer.cs @@ -1,279 +1,57 @@ -// Copyright © Datalust Pty Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - using System; -using System.Collections.Generic; -using Serilog; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; namespace SeqCli.Forwarder.Storage; -public class LogBuffer : IDisposable +record LogBuffer { - readonly ulong _bufferSizeBytes; - // readonly LightningEnvironment _env; - readonly object _sync = new(); - bool _isDisposed; - ulong _nextId = 0, _entries = 0, _writtenSinceRotateCheck; - - public LogBuffer(string bufferPath, ulong bufferSizeBytes) - { - _bufferSizeBytes = bufferSizeBytes; - if (bufferPath == null) throw new ArgumentNullException(nameof(bufferPath)); - - // _env = new LightningEnvironment(bufferPath) - // { - // // Sparse; we'd hope fragmentation never gets this bad... - // MapSize = (long) bufferSizeBytes*10 - // }; - // - // _env.Open(); - // - // using (var tx = _env.BeginTransaction()) - // using (var db = tx.OpenDatabase()) - // { - // using (var cur = tx.CreateCursor(db)) - // { - // if (!cur.MoveToLast()) - // { - // _nextId = 1; - // } - // else - // { - // var current = cur.GetCurrent(); - // _nextId = ByteKeyToULongKey(current.Key) + 1; - // _entries = (ulong) tx.GetEntriesCount(db); - // } - // } - // } - - Log.Information("Log buffer open on {BufferPath}; {Entries} entries, next key will be {NextId}", bufferPath, _entries, _nextId); - } - - public void Dispose() + public LogBuffer(Func write, CancellationToken cancellationToken) { - lock (_sync) + var channel = Channel.CreateBounded(new BoundedChannelOptions(5) + { + SingleReader = false, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait, + }); + + _shutdownTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _writer = channel.Writer; + _worker = Task.Run(async () => { - if (!_isDisposed) + await foreach (var entry in channel.Reader.ReadAllAsync(_shutdownTokenSource.Token)) { - _isDisposed = true; - // _env.Dispose(); + try + { + await write(_shutdownTokenSource.Token); + entry.Completion.SetResult(); + } + catch (Exception e) + { + entry.Completion.TrySetException(e); + } } - } - } - - public void Enqueue(byte[][] values) - { - if (values == null) throw new ArgumentNullException(nameof(values)); - - lock (_sync) - { - RequireNotDisposed(); - - // var totalPayloadWritten = 0UL; - // - // using (var tx = _env.BeginTransaction()) - // using (var db = tx.OpenDatabase()) - // { - // foreach (var v in values) - // { - // if (v == null) throw new ArgumentException("Value array may not contain null."); - // - // tx.Put(db, ULongKeyToByteKey(_nextId++), v); - // totalPayloadWritten += (ulong) v.Length; - // } - // - // tx.Commit(); - // _entries += (ulong) values.Length; - // _writtenSinceRotateCheck += totalPayloadWritten; - // } - - RotateIfRequired(); - } + }, cancellationToken: _shutdownTokenSource.Token); } - - void RotateIfRequired() + + readonly ChannelWriter _writer; + readonly Task _worker; + readonly CancellationTokenSource _shutdownTokenSource; + + public async Task WriteAsync(byte[] storage, Range range, CancellationToken cancellationToken) { - if (_writtenSinceRotateCheck < _bufferSizeBytes/10) - return; + var tcs = new TaskCompletionSource(); + var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownTokenSource.Token); - _writtenSinceRotateCheck = 0; - // - // using (var tx = _env.BeginTransaction()) - // using (var db = tx.OpenDatabase()) - // { - // int err; - // if (0 != (err = Lmdb.mdb_env_info(_env.Handle(), out var estat))) - // throw new Exception(Lmdb.mdb_strerror(err)); - // - // MDBStat stat; - // if (0 != (err = Lmdb.mdb_stat(tx.Handle(), db.Handle(), out stat))) - // throw new Exception(Lmdb.mdb_strerror(err)); - // - // // http://www.openldap.org/lists/openldap-technical/201303/msg00145.html - // // 1) MDB_stat gives you the page size. - // // 2) MDB_envinfo tells the mapsize and the last_pgno.If you divide mapsize - // // by pagesize you'll get max pgno. The MAP_FULL error is returned when last_pgno reaches max pgno. - // - // var targetPages = _bufferSizeBytes/stat.ms_psize; - // if ((ulong) estat.me_last_pgno < targetPages && (double) (ulong) estat.me_last_pgno/targetPages < 0.75) - // return; - // - // var count = tx.GetEntriesCount(db); - // if (count == 0) - // { - // Log.Warning("Attempting to rotate buffer but no events are present"); - // return; - // } - // - // var toPurge = Math.Max(count / 4, 1); - // Log.Warning("Buffer is full; dropping {ToPurge} events to make room for new ones", - // toPurge); - // - // using (var cur = tx.CreateCursor(db)) - // { - // cur.MoveToFirst(); - // - // for (var i = 0; i < toPurge; ++i) - // { - // cur.Delete(); - // cur.MoveNext(); - // } - // } - // - // tx.Commit(); - // } + await _writer.WriteAsync(new LogBufferEntry(storage, range, tcs), cts.Token); + await tcs.Task; } - public LogBufferEntry[] Peek(int maxValueBytesHint) + public async Task StopAsync() { - lock (_sync) - { - RequireNotDisposed(); - - var entries = new List(); - // - // using (var tx = _env.BeginTransaction(TransactionBeginFlags.ReadOnly)) - // using (var db = tx.OpenDatabase()) - // { - // using (var cur = tx.CreateCursor(db)) - // { - // if (cur.MoveToFirst()) - // { - // var entriesBytes = 0; - // - // do - // { - // var current = cur.GetCurrent(); - // var entry = new LogBufferEntry - // { - // Key = ByteKeyToULongKey(current.Key), - // Value = current.Value - // }; - // - // entriesBytes += entry.Value.Length; - // if (entries.Count != 0 && entriesBytes > maxValueBytesHint) - // break; - // - // entries.Add(entry); - // - // } while (cur.MoveNext()); - // } - // } - // } - - return entries.ToArray(); - } - } - - public void Dequeue(ulong toKey) - { - lock (_sync) - { - RequireNotDisposed(); - - // ulong deleted = 0; - // - // using (var tx = _env.BeginTransaction()) - // using (var db = tx.OpenDatabase()) - // { - // using (var cur = tx.CreateCursor(db)) - // { - // if (cur.MoveToFirst()) - // { - // do - // { - // var current = cur.GetCurrent(); - // if (ByteKeyToULongKey(current.Key) > toKey) - // break; - // - // cur.Delete(); - // deleted++; - // } while (cur.MoveNext()); - // } - // } - // - // tx.Commit(); - // _entries -= deleted; - // } - } - } - - void RequireNotDisposed() - { - if (_isDisposed) - throw new ObjectDisposedException(typeof(LogBuffer).FullName); - } - - static ulong ByteKeyToULongKey(byte[] key) - { - var copy = new byte[key.Length]; - for (var i = 0; i < key.Length; ++i) - copy[copy.Length - (i + 1)] = key[i]; - - return BitConverter.ToUInt64(copy, 0); - } - - static byte[] ULongKeyToByteKey(ulong key) - { - var k = BitConverter.GetBytes(key); - Array.Reverse(k); - return k; - } - - public void Enumerate(Action action) - { - if (action == null) throw new ArgumentNullException(nameof(action)); - - lock (_sync) - { - RequireNotDisposed(); - - // using (var tx = _env.BeginTransaction(TransactionBeginFlags.ReadOnly)) - // using (var db = tx.OpenDatabase()) - // { - // using (var cur = tx.CreateCursor(db)) - // { - // if (cur.MoveToFirst()) - // { - // do - // { - // var current = cur.GetCurrent(); - // action(ByteKeyToULongKey(current.Key), current.Value); - // } while (cur.MoveNext()); - // } - // } - // } - } + _writer.Complete(); + await _worker; + await _shutdownTokenSource.CancelAsync(); } } \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Storage/LogBufferEntry.cs b/src/SeqCli/Forwarder/Storage/LogBufferEntry.cs index 649be980..4477b926 100644 --- a/src/SeqCli/Forwarder/Storage/LogBufferEntry.cs +++ b/src/SeqCli/Forwarder/Storage/LogBufferEntry.cs @@ -1,23 +1,6 @@ -// Copyright © Datalust Pty Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// ReSharper disable InconsistentNaming +using System; +using System.Threading.Tasks; namespace SeqCli.Forwarder.Storage; -public struct LogBufferEntry -{ - public ulong Key; - public byte[] Value; -} \ No newline at end of file +public readonly record struct LogBufferEntry(byte[] Storage, Range Range, TaskCompletionSource Completion); diff --git a/src/SeqCli/Forwarder/Storage/LogBufferMap.cs b/src/SeqCli/Forwarder/Storage/LogBufferMap.cs new file mode 100644 index 00000000..b5b246fa --- /dev/null +++ b/src/SeqCli/Forwarder/Storage/LogBufferMap.cs @@ -0,0 +1,17 @@ +using System; +using System.Threading.Tasks; + +namespace SeqCli.Forwarder.Storage; + +class LogBufferMap +{ + public LogBufferMap() + { + + } + + public LogBuffer Get(string? apiKey) + { + return new LogBuffer(async (c) => await Task.Delay(TimeSpan.FromSeconds(1), c), default); + } +} diff --git a/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs b/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs index 3a463c9d..48fc8fa2 100644 --- a/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs +++ b/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs @@ -13,24 +13,25 @@ // limitations under the License. using System; +using System.Buffers; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Net; using System.Text; +using System.Text.Json; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; -using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Http.HttpResults; using Microsoft.Net.Http.Headers; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using SeqCli.Config; using SeqCli.Forwarder.Diagnostics; -using SeqCli.Forwarder.Multiplexing; -using SeqCli.Forwarder.Schema; -using SeqCli.Forwarder.Shipper; +using SeqCli.Forwarder.Storage; +using JsonException = System.Text.Json.JsonException; +using JsonSerializer = Newtonsoft.Json.JsonSerializer; namespace SeqCli.Forwarder.Web.Api; @@ -38,21 +39,18 @@ class IngestionEndpoints : IMapEndpoints { static readonly Encoding Utf8 = new UTF8Encoding(false); - readonly ActiveLogBufferMap _logBufferMap; readonly ConnectionConfig _connectionConfig; - readonly ServerResponseProxy _serverResponseProxy; + readonly LogBufferMap _logBuffers; readonly JsonSerializer _rawSerializer = JsonSerializer.Create( new JsonSerializerSettings { DateParseHandling = DateParseHandling.None }); public IngestionEndpoints( - ActiveLogBufferMap logBufferMap, - ServerResponseProxy serverResponseProxy, - ConnectionConfig connectionConfig) + SeqCliConfig config, + LogBufferMap logBuffers) { - _logBufferMap = logBufferMap; - _connectionConfig = connectionConfig; - _serverResponseProxy = serverResponseProxy; + _connectionConfig = config.Connection; + _logBuffers = logBuffers; } public void Map(WebApplication app) @@ -77,7 +75,7 @@ public void Map(WebApplication app) })); } - byte[][] EncodeRawEvents(ICollection events, IPAddress remoteIpAddress) + IEnumerable EncodeRawEvents(ICollection events, IPAddress remoteIpAddress) { var encoded = new byte[events.Count][]; var i = 0; @@ -146,101 +144,169 @@ static bool DefaultedBoolQuery(HttpRequest request, string queryParameterName) return "true".Equals(value, StringComparison.OrdinalIgnoreCase) || value == "" || value == queryParameterName; } - - IResult IngestRawFormat(HttpContext context) - { - // The compact format ingestion path works with async IO. - context.Features.Get()!.AllowSynchronousIO = true; - - JObject posted; - try - { - posted = _rawSerializer.Deserialize(new JsonTextReader(new StreamReader(context.Request.Body))) ?? - throw new RequestProcessingException("Request body payload is JSON `null`."); - } - catch (Exception ex) - { - IngestionLog.ForClient(context.Connection.RemoteIpAddress!).Debug(ex,"Rejecting payload due to invalid JSON, request body could not be parsed"); - throw new RequestProcessingException("Invalid raw event JSON, body could not be parsed."); - } - if (!(posted.TryGetValue("events", StringComparison.Ordinal, out var eventsToken) || - posted.TryGetValue("Events", StringComparison.Ordinal, out eventsToken))) - { - IngestionLog.ForClient(context.Connection.RemoteIpAddress!).Debug("Rejecting payload due to invalid JSON structure"); - throw new RequestProcessingException("Invalid raw event JSON, body must contain an 'Events' array."); - } + static string? ApiKey(HttpRequest request) + { + var apiKeyHeader = request.Headers["X-SeqApiKey"]; - if (!(eventsToken is JArray events)) - { - IngestionLog.ForClient(context.Connection.RemoteIpAddress!).Debug("Rejecting payload due to invalid Events property structure"); - throw new RequestProcessingException("Invalid raw event JSON, the 'Events' property must be an array."); - } + if (apiKeyHeader.Count > 0) return apiKeyHeader.Last(); + if (request.Query.TryGetValue("apiKey", out var apiKey)) return apiKey.Last(); - var encoded = EncodeRawEvents(events, context.Connection.RemoteIpAddress!); - return Enqueue(context.Request, encoded); + return null; + } + + + IResult IngestRawFormat(HttpContext context) + { + // Convert legacy format to CLEF + throw new NotImplementedException(); } async Task IngestCompactFormat(HttpContext context) { - var rawFormat = new List(); - var reader = new StreamReader(context.Request.Body); + var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted); + cts.CancelAfter(TimeSpan.FromSeconds(5)); - var line = await reader.ReadLineAsync(); - var lineNumber = 1; + var log = _logBuffers.Get(ApiKey(context.Request)); - while (line != null) + var payload = ArrayPool.Shared.Rent(1024 * 1024 * 10); + var writeHead = 0; + var readHead = 0; + var discarding = false; + + var done = false; + while (!done) { - if (!string.IsNullOrWhiteSpace(line)) + // Fill our buffer + while (!done) { - JObject item; - try + var remaining = payload.Length - writeHead; + if (remaining == 0) { - item = _rawSerializer.Deserialize(new JsonTextReader(new StringReader(line))) ?? - throw new RequestProcessingException("Request body payload is JSON `null`."); + break; } - catch (Exception ex) + + var read = await context.Request.Body.ReadAsync(payload.AsMemory(writeHead, remaining), context.RequestAborted); + if (read == 0) { - IngestionLog.ForPayload(context.Connection.RemoteIpAddress!, line).Debug(ex, "Rejecting CLEF payload due to invalid JSON, item could not be parsed"); - throw new RequestProcessingException($"Invalid raw event JSON, item on line {lineNumber} could not be parsed."); + done = true; } - if (!EventSchema.FromClefFormat(lineNumber, item, out var evt, out var err)) + writeHead += read; + } + + // Process events + var batchStart = readHead; + var batchEnd = readHead; + while (batchEnd < writeHead) + { + var eventStart = batchEnd; + var nlIndex = payload.AsSpan()[eventStart..].IndexOf((byte)'\n'); + + if (nlIndex == -1) { - IngestionLog.ForPayload(context.Connection.RemoteIpAddress!, line).Debug("Rejecting CLEF payload due to invalid event JSON structure: {NormalizationError}", err); - throw new RequestProcessingException(err); + break; } - rawFormat.Add(evt); - } + var eventEnd = eventStart + nlIndex + 1; - line = await reader.ReadLineAsync(); - ++lineNumber; - } + if (discarding) + { + batchStart = eventEnd; + batchEnd = eventEnd; + readHead = batchEnd; - return Enqueue( - context.Request, - EncodeRawEvents(rawFormat, context.Connection.RemoteIpAddress!)); - } - - ContentHttpResult Enqueue(HttpRequest request, byte[][] encodedEvents) - { - var apiKeyToken = request.Headers[SeqApi.ApiKeyHeaderName].FirstOrDefault(); + discarding = false; + } + else + { + batchEnd = eventEnd; + readHead = batchEnd; - if (string.IsNullOrWhiteSpace(apiKeyToken)) - apiKeyToken = request.Query["apiKey"]; + if (!ValidateClef(payload.AsSpan()[eventStart..batchEnd])) + { + await Write(log, ArrayPool.Shared, payload, batchStart..eventStart, cts.Token); + batchStart = batchEnd; + } + } + } - var apiKey = string.IsNullOrWhiteSpace(apiKeyToken) - ? null - : apiKeyToken.Trim(); - - _logBufferMap.GetLogBuffer(apiKey).Enqueue(encodedEvents); + if (batchStart != batchEnd) + { + await Write(log, ArrayPool.Shared, payload, batchStart..batchEnd, cts.Token); + } + else if (batchStart == 0) + { + readHead = payload.Length; + discarding = true; + } + // Copy any unprocessed data into our buffer and continue + if (!done) + { + var retain = payload.Length - readHead; + payload.AsSpan()[retain..].CopyTo(payload.AsSpan()[..retain]); + readHead = retain; + writeHead = retain; + } + } + + // Exception cases are handled by `Write` + ArrayPool.Shared.Return(payload); + return TypedResults.Content( - _serverResponseProxy.GetResponseText(apiKey), + null, "application/json", Utf8, StatusCodes.Status201Created); - + } + + bool ValidateClef(Span evt) + { + var reader = new Utf8JsonReader(evt); + + try + { + reader.Read(); + if (reader.TokenType != JsonTokenType.StartObject) + { + return false; + } + + while (reader.Read()) + { + if (reader.CurrentDepth == 1) + { + if (reader.TokenType == JsonTokenType.PropertyName) + { + var name = reader.GetString(); + + if (name != null & name!.StartsWith("@")) + { + // Validate @ property + } + } + } + } + } + catch (JsonException) + { + return false; + } + + return true; + } + + async Task Write(LogBuffer log, ArrayPool pool, byte[] storage, Range range, CancellationToken cancellationToken) + { + try + { + await log.WriteAsync(storage, range, cancellationToken); + } + catch + { + pool.Return(storage); + throw; + } } } \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Web/Host/ServerService.cs b/src/SeqCli/Forwarder/Web/Host/ServerService.cs index 9118eb24..40934120 100644 --- a/src/SeqCli/Forwarder/Web/Host/ServerService.cs +++ b/src/SeqCli/Forwarder/Web/Host/ServerService.cs @@ -15,20 +15,17 @@ using System; using Microsoft.Extensions.Hosting; using SeqCli.Forwarder.Diagnostics; -using SeqCli.Forwarder.Multiplexing; using Serilog; namespace SeqCli.Forwarder.Web.Host; class ServerService { - readonly ActiveLogBufferMap _logBufferMap; readonly IHost _host; readonly string _listenUri; - public ServerService(ActiveLogBufferMap logBufferMap, IHost host, string listenUri) + public ServerService(IHost host, string listenUri) { - _logBufferMap = logBufferMap; _host = host; _listenUri = listenUri; } @@ -43,9 +40,6 @@ public void Start() Log.Information("Seq Forwarder listening on {ListenUri}", _listenUri); IngestionLog.Log.Debug("Seq Forwarder is accepting events"); - - _logBufferMap.Load(); - _logBufferMap.Start(); } catch (Exception ex) { @@ -59,7 +53,6 @@ public void Stop() Log.Debug("Seq Forwarder stopping"); _host.StopAsync().Wait(); - _logBufferMap.Stop(); Log.Information("Seq Forwarder stopped cleanly"); } diff --git a/test/SeqCli.Tests/Forwarder/Multiplexing/ActiveLogBufferMapTests.cs b/test/SeqCli.Tests/Forwarder/Multiplexing/ActiveLogBufferMapTests.cs deleted file mode 100644 index a4ca2484..00000000 --- a/test/SeqCli.Tests/Forwarder/Multiplexing/ActiveLogBufferMapTests.cs +++ /dev/null @@ -1,80 +0,0 @@ -using System.IO; -using System.Linq; -using SeqCli.Config; -using SeqCli.Forwarder.Multiplexing; -using SeqCli.Tests.Support; -using Xunit; - -namespace SeqCli.Tests.Forwarder.Multiplexing; - -public class ActiveLogBufferMapTests -{ - [Fact] - public void AnEmptyMapCreatesNoFiles() - { - using var tmp = new TempFolder("Buffer"); - using var map = CreateActiveLogBufferMap(tmp); - Assert.Empty(Directory.GetFileSystemEntries(tmp.Path)); - } - - [Fact] - public void TheDefaultBufferWritesDataInTheBufferRoot() - { - using var tmp = new TempFolder("Buffer"); - using var map = CreateActiveLogBufferMap(tmp); - var entry = map.GetLogBuffer(null); - Assert.NotNull(entry); - Assert.True(File.Exists(Path.Combine(tmp.Path, "data.mdb"))); - Assert.Empty(Directory.GetDirectories(tmp.Path)); - Assert.Same(entry, map.GetLogBuffer(null)); - } - - [Fact] - public void ApiKeySpecificBuffersWriteDataToSubfolders() - { - using var tmp = new TempFolder("Buffer"); - using var map = CreateActiveLogBufferMap(tmp); - string key1 = Some.ApiKey(), key2 = Some.ApiKey(); - var entry1 = map.GetLogBuffer(key1); - var entry2 = map.GetLogBuffer(key2); - - Assert.NotNull(entry1); - Assert.NotNull(entry2); - Assert.Same(entry1, map.GetLogBuffer(key1)); - Assert.NotSame(entry1, entry2); - var subdirs = Directory.GetDirectories(tmp.Path); - Assert.Equal(2, subdirs.Length); - Assert.True(File.Exists(Path.Combine(subdirs[0], "data.mdb"))); - Assert.True(File.Exists(Path.Combine(subdirs[0], ".apikey"))); - } - - [Fact] - public void EntriesSurviveReloads() - { - var apiKey = Some.ApiKey(); - var value = Some.Bytes(100); - - using var tmp = new TempFolder("Buffer"); - using (var map = CreateActiveLogBufferMap(tmp)) - { - map.GetLogBuffer(null).Enqueue([value]); - map.GetLogBuffer(apiKey).Enqueue([value]); - } - - using (var map = CreateActiveLogBufferMap(tmp)) - { - var first = map.GetLogBuffer(null).Peek(0).Single(); - var second = map.GetLogBuffer(apiKey).Peek(0).Single(); - Assert.Equal(value, first.Value); - Assert.Equal(value, second.Value); - } - } - - static ActiveLogBufferMap CreateActiveLogBufferMap(TempFolder tmp) - { - var config = new SeqCliConfig(); - var map = new ActiveLogBufferMap(tmp.Path, config, new InertLogShipperFactory()); - map.Load(); - return map; - } -} \ No newline at end of file diff --git a/test/SeqCli.Tests/Forwarder/Schema/EventSchemaTests.cs b/test/SeqCli.Tests/Forwarder/Schema/EventSchemaTests.cs deleted file mode 100644 index 8d1abcba..00000000 --- a/test/SeqCli.Tests/Forwarder/Schema/EventSchemaTests.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System.IO; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using SeqCli.Forwarder.Schema; -using Xunit; - -namespace SeqCli.Tests.Forwarder.Schema; - -public class EventSchemaTests -{ - static readonly JsonSerializer RawSerializer = JsonSerializer.Create( - new JsonSerializerSettings { DateParseHandling = DateParseHandling.None }); - - [Fact] - public void ClefNormalizationAcceptsDuplicateRenderings() - { - var payload = "{\"@t\": \"2015-05-09T12:09:08.12345Z\"," + - " \"@mt\": \"{A:000} and {A:000}\"," + - " \"@r\": [\"424\",\"424\"]}"; - - AssertCanNormalizeClef(payload); - } - - [Fact] - public void ClefNormalizationPropagatesRenderings() - { - const string payload = "{\"@t\":\"2018-12-02T09:05:47.256725+03:00\",\"@mt\":\"Hello {P:000}!\",\"P\":12,\"@r\":[\"012\"]}"; - var evt = AssertCanNormalizeClef(payload); - Assert.Single(evt.Renderings); - } - - [Fact] - public void ClefNormalizationIgnoresMissingRenderings() - { - const string payload = "{\"@t\":\"2018-12-02T09:05:47.256725+03:00\",\"@mt\":\"Hello {P:000}!\",\"P\":12}"; - AssertCanNormalizeClef(payload); - } - - [Fact] - public void ClefNormalizationFixesTooFewRenderings1() - { - const string payload = "{\"@t\":\"2018-12-02T09:05:47.256725+03:00\",\"@mt\":\"Hello {P:000}!\",\"P\":12,\"@r\":[]}"; - var evt = AssertCanNormalizeClef(payload); - Assert.Null(evt.Renderings); - } - - [Fact] - public void ClefNormalizationFixesTooFewRenderings2() - { - const string payload = "{\"@t\":\"2018-12-02T09:05:47.256725+03:00\",\"@mt\":\"Hello {P:000} {Q:x}!\",\"P\":12,\"@r\":[\"012\"]}"; - var evt = AssertCanNormalizeClef(payload); - Assert.Null(evt.Renderings); - } - - [Fact] - public void ClefNormalizationIgnoresTooManyRenderings() - { - const string payload = "{\"@t\":\"2018-12-02T09:05:47.256725+03:00\",\"@mt\":\"Hello {P:000}!\",\"P\":12,\"@r\":[\"012\",\"013\"]}"; - var evt = AssertCanNormalizeClef(payload); - Assert.Null(evt.Renderings); - } - - static dynamic AssertCanNormalizeClef(string payload) - { - var jo = RawSerializer.Deserialize(new JsonTextReader(new StringReader(payload)))!; - - var valid = EventSchema.FromClefFormat(1, jo, out var rawFormat, out var error); - Assert.True(valid, error); - Assert.NotNull(rawFormat); - return rawFormat!; - } -} \ No newline at end of file diff --git a/test/SeqCli.Tests/Forwarder/Shipper/ServerResponseProxyTests.cs b/test/SeqCli.Tests/Forwarder/Shipper/ServerResponseProxyTests.cs deleted file mode 100644 index 107a6973..00000000 --- a/test/SeqCli.Tests/Forwarder/Shipper/ServerResponseProxyTests.cs +++ /dev/null @@ -1,46 +0,0 @@ -using SeqCli.Forwarder.Multiplexing; -using SeqCli.Tests.Support; -using Xunit; - -namespace SeqCli.Tests.Forwarder.Shipper; - -public class ServerResponseProxyTests -{ - [Fact] - public void WhenNoResponseRecordedEmptyIsReturned() - { - var proxy = new ServerResponseProxy(); - var response = proxy.GetResponseText(Some.ApiKey()); - Assert.Equal("{}", response); - } - - [Fact] - public void WhenApiKeysDontMatchEmptyResponseReturned() - { - var proxy = new ServerResponseProxy(); - proxy.SuccessResponseReturned(Some.ApiKey(), "this is never used"); - var response = proxy.GetResponseText(Some.ApiKey()); - Assert.Equal("{}", response); - } - - [Fact] - public void WhenApiKeysMatchTheResponseIsReturned() - { - var proxy = new ServerResponseProxy(); - var apiKey = Some.ApiKey(); - var responseText = "some response"; - proxy.SuccessResponseReturned(apiKey, responseText); - var response = proxy.GetResponseText(apiKey); - Assert.Equal(responseText, response); - } - - [Fact] - public void NullApiKeysAreConsideredMatching() - { - var proxy = new ServerResponseProxy(); - var responseText = "some response"; - proxy.SuccessResponseReturned(null, responseText); - var response = proxy.GetResponseText(null); - Assert.Equal(responseText, response); - } -} \ No newline at end of file