From 3d34822c77bd0a71227d1466299fe336f61f5d1c Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Wed, 9 Apr 2025 13:16:27 +0200 Subject: [PATCH 1/3] Resilient event watcher Ensures the event watcher recovers from exceptions and restarts, preventing service interruption. Adds logging for watcher errors to improve debugging and monitoring. --- src/K8sOperator.NET/EventWatcher.cs | 24 +++++++++++++------ .../Extensions/LoggingExtensions.cs | 6 +++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/K8sOperator.NET/EventWatcher.cs b/src/K8sOperator.NET/EventWatcher.cs index 8c00120..e084edc 100644 --- a/src/K8sOperator.NET/EventWatcher.cs +++ b/src/K8sOperator.NET/EventWatcher.cs @@ -53,11 +53,22 @@ public async Task Start(CancellationToken cancellationToken) _cancellationToken = cancellationToken; _isRunning = true; - var response = Client.ListAsync(LabelSelector, cancellationToken); - - await foreach (var (type, item) in response.WatchAsync(OnError, cancellationToken)) + while (_isRunning && !_cancellationToken.IsCancellationRequested) { - OnEvent(type, item); + try + { + var response = Client.ListAsync(LabelSelector, cancellationToken); + + await foreach (var (type, item) in response.WatchAsync(OnError, cancellationToken)) + { + OnEvent(type, item); + } + } + catch (Exception) + { + Logger.WatcherError("Error in watcher loop restarting..."); + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } } Logger.EndWatch(Crd.PluralName, LabelSelector); @@ -76,8 +87,7 @@ private void OnEvent(WatchEventType eventType, T customResource) var exception = t.Exception.Flatten().InnerException; Logger.ProcessEventError(exception, eventType, customResource); } - }) - ; + }); } private async Task ProccessEventAsync(WatchEventType eventType, T resource) @@ -233,7 +243,7 @@ private void OnError(Exception exception) { if (_isRunning) { - Logger.LogError(exception, "Watcher error"); + Logger.WatcherError(exception.Message); } } } diff --git a/src/K8sOperator.NET/Extensions/LoggingExtensions.cs b/src/K8sOperator.NET/Extensions/LoggingExtensions.cs index 5a5a529..2dcd4a8 100644 --- a/src/K8sOperator.NET/Extensions/LoggingExtensions.cs +++ b/src/K8sOperator.NET/Extensions/LoggingExtensions.cs @@ -181,4 +181,10 @@ internal static partial class LoggingExtensions Message = "End Error {resource}")] public static partial void EndError(this ILogger logger, CustomResource resource); + [LoggerMessage( + EventId = 28, + Level = LogLevel.Information, + Message = "Watcher Error {message}")] + public static partial void WatcherError(this ILogger logger, string message); + } From 8eef2028f2f7d74cc426db73ecb14d9616f5e8f9 Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Wed, 9 Apr 2025 14:10:56 +0200 Subject: [PATCH 2/3] Handle TokenCancelation --- src/K8sOperator.NET/EventWatcher.cs | 13 +++++++++++-- test/K8sOperator.NET.Tests/EventWatcherTests.cs | 12 +++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/K8sOperator.NET/EventWatcher.cs b/src/K8sOperator.NET/EventWatcher.cs index e084edc..bcc002a 100644 --- a/src/K8sOperator.NET/EventWatcher.cs +++ b/src/K8sOperator.NET/EventWatcher.cs @@ -64,9 +64,18 @@ public async Task Start(CancellationToken cancellationToken) OnEvent(type, item); } } - catch (Exception) + catch (TaskCanceledException) { - Logger.WatcherError("Error in watcher loop restarting..."); + Logger.WatcherError("Task was canceled."); + } + catch (OperationCanceledException) + { + Logger.WatcherError("Operation was canceled restarting..."); + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } + catch (HttpOperationException ex) + { + Logger.WatcherError($"Http Error: {ex.Response.Content}, restarting..."); await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); } } diff --git a/test/K8sOperator.NET.Tests/EventWatcherTests.cs b/test/K8sOperator.NET.Tests/EventWatcherTests.cs index bb1944f..4739fe4 100644 --- a/test/K8sOperator.NET.Tests/EventWatcherTests.cs +++ b/test/K8sOperator.NET.Tests/EventWatcherTests.cs @@ -52,12 +52,14 @@ private static Watcher.WatchEvent CreateEvent(WatchEventType type, T item) private readonly ITestOutputHelper _testOutput; private readonly Controller _controller = Substitute.For>(); + private readonly CancellationTokenSource _tokenSource; private readonly ILoggerFactory _loggerFactory = Substitute.For(); private readonly ILogger _logger = Substitute.For(); private readonly List _metadata; public EventWatcherTests(ITestOutputHelper testOutput) { + _tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(2)); _testOutput = testOutput; _loggerFactory.CreateLogger(Arg.Any()).Returns(_logger); _metadata = [ @@ -71,8 +73,8 @@ public EventWatcherTests(ITestOutputHelper testOutput) [Fact] public async Task Start_Should_StartWatchAndLogStart() { - var cancellationToken = new CancellationTokenSource().Token; - + var cancellationToken = _tokenSource.Token; + using ( var server = new MockKubeApiServer(_testOutput, endpoints => { endpoints.MapListNamespacedCustomObjectWithHttpMessagesAsync(); @@ -90,7 +92,7 @@ public async Task Start_Should_StartWatchAndLogStart() [Fact] public async Task OnEvent_Should_HandleAddedEventAndCallAddOrModifyAsync() { - var cancellationToken = new CancellationTokenSource().Token; + var cancellationToken = _tokenSource.Token; using (var server = new MockKubeApiServer(_testOutput, endpoints => { @@ -112,7 +114,7 @@ public async Task OnEvent_Should_HandleAddedEventAndCallAddOrModifyAsync() [Fact] public async Task OnEvent_Should_HandleDeletedEventAndCallDeleteAsync() { - var cancellationToken = new CancellationTokenSource().Token; + var cancellationToken = _tokenSource.Token; using (var server = new MockKubeApiServer(_testOutput, endpoints => { @@ -134,7 +136,7 @@ public async Task OnEvent_Should_HandleDeletedEventAndCallDeleteAsync() [Fact] public async Task HandleFinalizeAsync_Should_CallFinalizeAndRemoveFinalizer() { - var cancellationToken = new CancellationTokenSource().Token; + var cancellationToken = _tokenSource.Token; using (var server = new MockKubeApiServer(_testOutput, endpoints => { From 64c03b84e1847a24a73c8ca43df97ccdf2929790 Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Wed, 9 Apr 2025 14:24:29 +0200 Subject: [PATCH 3/3] Update Logger --- src/K8sOperator.NET/EventWatcher.cs | 2 ++ src/K8sOperator.NET/Extensions/LoggingExtensions.cs | 11 +++++++++-- src/K8sOperator.NET/KubernetesClient.cs | 4 ++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/K8sOperator.NET/EventWatcher.cs b/src/K8sOperator.NET/EventWatcher.cs index bcc002a..409f39b 100644 --- a/src/K8sOperator.NET/EventWatcher.cs +++ b/src/K8sOperator.NET/EventWatcher.cs @@ -53,6 +53,8 @@ public async Task Start(CancellationToken cancellationToken) _cancellationToken = cancellationToken; _isRunning = true; + Logger.BeginWatch(Crd.PluralName, LabelSelector); + while (_isRunning && !_cancellationToken.IsCancellationRequested) { try diff --git a/src/K8sOperator.NET/Extensions/LoggingExtensions.cs b/src/K8sOperator.NET/Extensions/LoggingExtensions.cs index 2dcd4a8..dba0b27 100644 --- a/src/K8sOperator.NET/Extensions/LoggingExtensions.cs +++ b/src/K8sOperator.NET/Extensions/LoggingExtensions.cs @@ -36,9 +36,9 @@ internal static partial class LoggingExtensions [LoggerMessage( EventId = 4, Level = LogLevel.Information, - Message = "Begin watch {ns}/{plural} {labelselector}" + Message = "Begin watch {plural} {labelselector}" )] - public static partial void BeginWatch(this ILogger logger, string ns, string plural, string labelselector); + public static partial void BeginWatch(this ILogger logger, string plural, string labelselector); [LoggerMessage( EventId = 5, @@ -187,4 +187,11 @@ internal static partial class LoggingExtensions Message = "Watcher Error {message}")] public static partial void WatcherError(this ILogger logger, string message); + [LoggerMessage( + EventId = 29, + Level = LogLevel.Information, + Message = "ListAsync {ns}/{plural} {labelselector}" + )] + public static partial void ListAsync(this ILogger logger, string ns, string plural, string labelselector); + } diff --git a/src/K8sOperator.NET/KubernetesClient.cs b/src/K8sOperator.NET/KubernetesClient.cs index 2fd294a..3db5848 100644 --- a/src/K8sOperator.NET/KubernetesClient.cs +++ b/src/K8sOperator.NET/KubernetesClient.cs @@ -28,7 +28,7 @@ public Task> ListAsync(string labelSelector, Ca { var info = typeof(T).GetCustomAttribute()!; - Logger.BeginWatch(Namespace, info.PluralName, labelSelector); + Logger.ListAsync(Namespace, info.PluralName, labelSelector); var response = Client.CustomObjects.ListNamespacedCustomObjectWithHttpMessagesAsync( info.Group, @@ -98,7 +98,7 @@ public Task> ListAsync(string labelSelector, Ca { var info = typeof(T).GetCustomAttribute()!; - Logger.BeginWatch("cluster-wide", info.PluralName, labelSelector); + Logger.ListAsync("cluster-wide", info.PluralName, labelSelector); var response = Client.CustomObjects.ListClusterCustomObjectWithHttpMessagesAsync( info.Group,