diff --git a/src/SeqCli/Config/Forwarder/SeqCliForwarderConfig.cs b/src/SeqCli/Config/Forwarder/SeqCliForwarderConfig.cs index 1043d0c8..68f83e07 100644 --- a/src/SeqCli/Config/Forwarder/SeqCliForwarderConfig.cs +++ b/src/SeqCli/Config/Forwarder/SeqCliForwarderConfig.cs @@ -7,4 +7,5 @@ class SeqCliForwarderConfig public SeqCliForwarderStorageConfig Storage { get; set; } = new(); public SeqCliForwarderDiagnosticConfig Diagnostics { get; set; } = new(); public SeqCliForwarderApiConfig Api { get; set; } = new(); + public bool UseApiKeyForwarding { get; set; } } \ No newline at end of file diff --git a/src/SeqCli/Config/KeyValueSettings.cs b/src/SeqCli/Config/KeyValueSettings.cs index 61d3218f..96a1a88c 100644 --- a/src/SeqCli/Config/KeyValueSettings.cs +++ b/src/SeqCli/Config/KeyValueSettings.cs @@ -57,7 +57,7 @@ public static void Set(SeqCliConfig config, string key, string? value) // would be more robust. var targetProperty = receiver.GetType().GetTypeInfo().DeclaredProperties .Where(p => p is { CanRead: true, CanWrite: true } && p.GetMethod!.IsPublic && p.SetMethod!.IsPublic && !p.GetMethod.IsStatic) - .SingleOrDefault(p => Camelize(p.Name) == steps[^1]); + .SingleOrDefault(p => Camelize(GetUserFacingName(p)) == steps[^1]); if (targetProperty == null) throw new ArgumentException("The key could not be found; run `seqcli config list` to view all keys."); diff --git a/src/SeqCli/Forwarder/Channel/ApiKeyForwardingChannelWrapper.cs b/src/SeqCli/Forwarder/Channel/ApiKeyForwardingChannelWrapper.cs new file mode 100644 index 00000000..258e93db --- /dev/null +++ b/src/SeqCli/Forwarder/Channel/ApiKeyForwardingChannelWrapper.cs @@ -0,0 +1,90 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Seq.Api; +using SeqCli.Config; +using SeqCli.Forwarder.Filesystem.System; +using Serilog; + +namespace SeqCli.Forwarder.Channel; + +class ApiKeyForwardingChannelWrapper : ForwardingChannelWrapper +{ + readonly Dictionary _channelsByApiKey = new(); + const string EmptyApiKeyChannelId = "EmptyApiKey"; + + public ApiKeyForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config) : base(bufferPath, connection, config) + { + LoadChannels(); + } + + // Start forwarding channels found on the file system. + void LoadChannels() + { + foreach (var directoryPath in Directory.EnumerateDirectories(BufferPath)) + { + if (directoryPath.Equals(GetStorePath(SeqCliConnectionChannelId))) + { + // data was stored when not using API key forwarding + continue; + } + + string apiKey, channelId; + + if (new SystemStoreDirectory(directoryPath).TryReadApiKey(Config, out var key)) + { + apiKey = key!; + channelId = directoryPath; + } + else + { + // directory should contain an api key file but does not + continue; + } + + var created = OpenOrCreateChannel(channelId, apiKey); + _channelsByApiKey.Add(apiKey, created); + } + } + + public override ForwardingChannel GetForwardingChannel(string? requestApiKey) + { + lock (ChannelsSync) + { + // use empty string to represent no api key + if (_channelsByApiKey.TryGetValue(requestApiKey ?? "", out var channel)) + { + return channel; + } + + var channelId = ApiKeyToId(requestApiKey); + var created = OpenOrCreateChannel(channelId, requestApiKey); + var store = new SystemStoreDirectory(GetStorePath(channelId)); + store.WriteApiKey(Config, requestApiKey ?? ""); + _channelsByApiKey.Add(requestApiKey ?? "", created); + return created; + } + } + + string ApiKeyToId(string? apiKey) + { + return string.IsNullOrEmpty(apiKey) ? EmptyApiKeyChannelId : Guid.NewGuid().ToString(); + } + + public override async Task StopAsync() + { + Log.ForContext().Information("Flushing log buffers"); + ShutdownTokenSource.CancelAfter(TimeSpan.FromSeconds(30)); + + Task[] stopChannels; + lock (ChannelsSync) + { + stopChannels = _channelsByApiKey.Values.Select(ch => ch.StopAsync()).ToArray(); + } + + await Task.WhenAll([..stopChannels]); + await ShutdownTokenSource.CancelAsync(); + } +} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs b/src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs deleted file mode 100644 index 553efcc8..00000000 --- a/src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs +++ /dev/null @@ -1,96 +0,0 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Seq.Api; -using SeqCli.Config; -using SeqCli.Forwarder.Filesystem.System; -using SeqCli.Forwarder.Storage; -using Serilog; - -namespace SeqCli.Forwarder.Channel; - -class ForwardingChannelMap -{ - readonly string _bufferPath; - readonly SeqConnection _connection; - readonly SeqCliConfig _config; - readonly ForwardingChannel _defaultChannel; - readonly Lock _channelsSync = new(); - readonly Dictionary _channels = new(); - readonly CancellationTokenSource _shutdownTokenSource = new(); - - public ForwardingChannelMap(string bufferPath, SeqConnection connection, string? defaultApiKey, SeqCliConfig config) - { - _bufferPath = bufferPath; - _connection = connection; - _config = config; - _defaultChannel = OpenOrCreateChannel(defaultApiKey, "Default"); - - // TODO, load other channels at start-up - } - - ForwardingChannel OpenOrCreateChannel(string? apiKey, string name) - { - // TODO, when it's not the default, persist the API key and validate equality on reopen - - var storePath = Path.Combine(_bufferPath, name); - var store = new SystemStoreDirectory(storePath); - Log.ForContext().Information("Opening local buffer in {StorePath}", storePath); - - return new ForwardingChannel( - BufferAppender.Open(store), - BufferReader.Open(store), - Bookmark.Open(store), - _connection, - apiKey, - _config.Forwarder.Storage.TargetChunkSizeBytes, - _config.Forwarder.Storage.MaxChunks, - _config.Connection.BatchSizeLimitBytes, - _shutdownTokenSource.Token); - } - - public ForwardingChannel Get(string? apiKey) - { - if (string.IsNullOrWhiteSpace(apiKey)) - { - return _defaultChannel; - } - - lock (_channelsSync) - { - if (_channels.TryGetValue(apiKey, out var channel)) - { - return channel; - } - - // Seq API keys begin with four identifying characters that aren't considered part of the - // confidential key. TODO: we could likely do better than this. - var name = apiKey[..4]; - var created = OpenOrCreateChannel(apiKey, name); - _channels.Add(apiKey, created); - return created; - } - } - - public async Task StopAsync() - { - Log.ForContext().Information("Flushing log buffers"); - - _shutdownTokenSource.CancelAfter(TimeSpan.FromSeconds(30)); - - Task[] stopChannels; - lock (_channelsSync) - { - stopChannels = _channels.Values.Select(ch => ch.StopAsync()).ToArray(); - } - - await Task.WhenAll([ - _defaultChannel.StopAsync(), - ..stopChannels]); - - await _shutdownTokenSource.CancelAsync(); - } -} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Channel/ForwardingChannelWrapper.cs b/src/SeqCli/Forwarder/Channel/ForwardingChannelWrapper.cs new file mode 100644 index 00000000..1cf4c87c --- /dev/null +++ b/src/SeqCli/Forwarder/Channel/ForwardingChannelWrapper.cs @@ -0,0 +1,49 @@ +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Seq.Api; +using SeqCli.Config; +using SeqCli.Forwarder.Filesystem.System; +using SeqCli.Forwarder.Storage; +using Serilog; + +namespace SeqCli.Forwarder.Channel; + +internal abstract class ForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config) +{ + protected const string SeqCliConnectionChannelId = "SeqCliConnection"; + protected readonly string BufferPath = bufferPath; + protected readonly SeqCliConfig Config = config; + protected readonly CancellationTokenSource ShutdownTokenSource = new(); + protected readonly Lock ChannelsSync = new(); + + // The id used for the channel storage on the file system. + // The apiKey that will be used to connect to the downstream Seq instance. + protected ForwardingChannel OpenOrCreateChannel(string id, string? apiKey) + { + var storePath = GetStorePath(id); + var store = new SystemStoreDirectory(storePath); + + Log.ForContext().Information("Opening local buffer in {StorePath}", storePath); + + return new ForwardingChannel( + BufferAppender.Open(store), + BufferReader.Open(store), + Bookmark.Open(store), + connection, + apiKey, + Config.Forwarder.Storage.TargetChunkSizeBytes, + Config.Forwarder.Storage.MaxChunks, + Config.Connection.BatchSizeLimitBytes, + ShutdownTokenSource.Token); + } + + public abstract ForwardingChannel GetForwardingChannel(string? requestApiKey); + + public abstract Task StopAsync(); + + protected string GetStorePath(string id) + { + return Path.Combine(BufferPath, id); + } +} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Channel/SeqCliConnectionForwardingChannelWrapper.cs b/src/SeqCli/Forwarder/Channel/SeqCliConnectionForwardingChannelWrapper.cs new file mode 100644 index 00000000..049e6180 --- /dev/null +++ b/src/SeqCli/Forwarder/Channel/SeqCliConnectionForwardingChannelWrapper.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Seq.Api; +using SeqCli.Config; +using Serilog; + +namespace SeqCli.Forwarder.Channel; + +class SeqCliConnectionForwardingChannelWrapper: ForwardingChannelWrapper +{ + readonly ForwardingChannel _seqCliConnectionChannel; + + public SeqCliConnectionForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config, string? seqCliApiKey): base(bufferPath, connection, config) + { + _seqCliConnectionChannel = OpenOrCreateChannel(SeqCliConnectionChannelId, seqCliApiKey); + } + + public override ForwardingChannel GetForwardingChannel(string? _) + { + return _seqCliConnectionChannel; + } + + public override async Task StopAsync() + { + Log.ForContext().Information("Flushing log buffers"); + ShutdownTokenSource.CancelAfter(TimeSpan.FromSeconds(30)); + + await _seqCliConnectionChannel.StopAsync(); + await ShutdownTokenSource.CancelAsync(); + } +} \ No newline at end of file diff --git a/src/SeqCli/Forwarder/Filesystem/System/SystemStoreDirectory.cs b/src/SeqCli/Forwarder/Filesystem/System/SystemStoreDirectory.cs index c1998e73..5a38ac98 100644 --- a/src/SeqCli/Forwarder/Filesystem/System/SystemStoreDirectory.cs +++ b/src/SeqCli/Forwarder/Filesystem/System/SystemStoreDirectory.cs @@ -14,8 +14,12 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.IO; using System.Runtime.InteropServices; +using System.Text; +using SeqCli.Config; +using Serilog; #if UNIX using SeqCli.Forwarder.Filesystem.System.Unix; @@ -34,6 +38,34 @@ public SystemStoreDirectory(string path) if (!Directory.Exists(_directoryPath)) Directory.CreateDirectory(_directoryPath); } + public void WriteApiKey(SeqCliConfig config, string apiKey) + { + File.WriteAllBytes( + Path.Combine(_directoryPath, "api.key"), + config.Encryption.DataProtector().Encrypt(Encoding.UTF8.GetBytes(apiKey))); + } + + public bool TryReadApiKey(SeqCliConfig config, [NotNullWhen(true)] out string? apiKey) + { + apiKey = null; + var path = Path.Combine(_directoryPath, "api.key"); + + if (!File.Exists(path)) return false; + + try + { + var encrypted = File.ReadAllBytes(path); + apiKey = Encoding.UTF8.GetString(config.Encryption.DataProtector().Decrypt(encrypted)); + return true; + } + catch (Exception exception) + { + Log.Warning(exception, "Could not read or decrypt api key"); + } + + return false; + } + public override SystemStoreFile Create(string name) { var filePath = Path.Combine(_directoryPath, name); diff --git a/src/SeqCli/Forwarder/ForwarderModule.cs b/src/SeqCli/Forwarder/ForwarderModule.cs index 41991246..2a2d1284 100644 --- a/src/SeqCli/Forwarder/ForwarderModule.cs +++ b/src/SeqCli/Forwarder/ForwarderModule.cs @@ -45,7 +45,19 @@ public ForwarderModule(string bufferPath, SeqCliConfig config, SeqConnection con protected override void Load(ContainerBuilder builder) { builder.RegisterType().SingleInstance(); - builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey, _config)).SingleInstance(); + + if (_config.Forwarder.UseApiKeyForwarding) + { + builder.Register(_ => + new ApiKeyForwardingChannelWrapper(_bufferPath, _connection, _config)) + .As().SingleInstance(); + } + else + { + builder.Register(_ => + new SeqCliConnectionForwardingChannelWrapper(_bufferPath, _connection, _config, _apiKey)) + .As().SingleInstance(); + } builder.RegisterType().As(); diff --git a/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs b/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs index 467087f9..32db6e8d 100644 --- a/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs +++ b/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs @@ -37,10 +37,10 @@ class IngestionEndpoints : IMapEndpoints { static readonly Encoding Utf8 = new UTF8Encoding(false); - readonly ForwardingChannelMap _forwardingChannels; + readonly ForwardingChannelWrapper _forwardingChannels; readonly SeqCliConfig _config; - public IngestionEndpoints(ForwardingChannelMap forwardingChannels, SeqCliConfig config) + public IngestionEndpoints(ForwardingChannelWrapper forwardingChannels, SeqCliConfig config) { _forwardingChannels = forwardingChannels; _config = config; @@ -75,7 +75,8 @@ async Task IngestCompactFormatAsync(HttpContext context) var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted); cts.CancelAfter(TimeSpan.FromSeconds(5)); - var log = _forwardingChannels.Get(GetApiKey(context.Request)); + var requestApiKey = GetApiKey(context.Request); + var log = _forwardingChannels.GetForwardingChannel(requestApiKey); // Add one for the extra newline that we have to insert at the end of batches. var bufferSize = _config.Connection.BatchSizeLimitBytes + 1; diff --git a/src/SeqCli/Forwarder/Web/Host/ServerService.cs b/src/SeqCli/Forwarder/Web/Host/ServerService.cs index 3732497c..18e9ff30 100644 --- a/src/SeqCli/Forwarder/Web/Host/ServerService.cs +++ b/src/SeqCli/Forwarder/Web/Host/ServerService.cs @@ -24,10 +24,10 @@ namespace SeqCli.Forwarder.Web.Host; class ServerService { readonly IHost _host; - readonly ForwardingChannelMap _forwardingChannelMap; + readonly ForwardingChannelWrapper _forwardingChannelMap; readonly string _listenUri; - public ServerService(IHost host, ForwardingChannelMap forwardingChannelMap, string listenUri) + public ServerService(IHost host, ForwardingChannelWrapper forwardingChannelMap, string listenUri) { _host = host; _forwardingChannelMap = forwardingChannelMap;