From 52efc13dede90dec6cbfb06845f324cb381326c4 Mon Sep 17 00:00:00 2001 From: Antonio Nesic Date: Mon, 16 Feb 2026 11:08:59 +0100 Subject: [PATCH 1/5] Add healthz and readyz probes as well as server --- cmd/main.go | 24 ++++--- internal/health/manager.go | 47 +++++++++++++ internal/health/manager_test.go | 101 +++++++++++++++++++++++++++ internal/health/server.go | 119 ++++++++++++++++++++++++++++++++ internal/health/server_test.go | 108 +++++++++++++++++++++++++++++ 5 files changed, 391 insertions(+), 8 deletions(-) create mode 100644 internal/health/server.go create mode 100644 internal/health/server_test.go diff --git a/cmd/main.go b/cmd/main.go index 8166de3e..6f01b947 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -31,7 +31,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -40,6 +39,7 @@ import ( monitoringv1 "github.com/devzero-inc/zxporter/api/v1" "github.com/devzero-inc/zxporter/internal/controller" // +kubebuilder:scaffold:imports + "github.com/devzero-inc/zxporter/internal/health" ) var ( @@ -125,7 +125,7 @@ func main() { Scheme: scheme, Metrics: metricsServerOptions, WebhookServer: webhookServer, - HealthProbeBindAddress: probeAddr, + HealthProbeBindAddress: "", // Disable the default health probe server since we are using a custom one in 'internal/health' LeaderElection: enableLeaderElection, LeaderElectionID: "055ced15.devzero.io", // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily @@ -162,12 +162,10 @@ func main() { // No need to add the standard controller with kubebuilder:scaffold:builder // The env-based controller doesn't rely on CRDs - if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") - os.Exit(1) - } - if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") + // New health server from health package + healthServer := health.NewHealthServer(envController.Reconciler.HealthManager, probeAddr) + if err := healthServer.Start(); err != nil { + setupLog.Error(err, "unable to start health server") os.Exit(1) } @@ -176,4 +174,14 @@ func main() { setupLog.Error(err, "problem running manager") os.Exit(1) } + + // TODO when we make sure that the custom health server works, we can remove the default health probe server and the related code below + // if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + // setupLog.Error(err, "unable to set up health check") + // os.Exit(1) + // } + // if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + // setupLog.Error(err, "unable to set up ready check") + // os.Exit(1) + // } } diff --git a/internal/health/manager.go b/internal/health/manager.go index 30f264e9..0a70b65c 100644 --- a/internal/health/manager.go +++ b/internal/health/manager.go @@ -1,6 +1,8 @@ package health import ( + "fmt" + "net/http" "sync" ) @@ -108,3 +110,48 @@ func (hm *HealthManager) BuildReport() map[string]ComponentStatus { } return report } + +// LivenessCheck checks if all components are at least Degraded (not Unhealthy) +func (hm *HealthManager) LivenessCheck(_ *http.Request) error { + hm.mu.RLock() + defer hm.mu.RUnlock() + + component, exists := hm.components[ComponentCollectorManager] + if exists && component.Status == HealthStatusUnhealthy { + return fmt.Errorf("%s is %s: %s", ComponentCollectorManager, component.Status, component.Message) + } + + return nil +} + +// ReadinessCheck checks if all components are Healthy +func (hm *HealthManager) ReadinessCheck(_ *http.Request) error { + hm.mu.RLock() + defer hm.mu.RUnlock() + + readyComponents := []string{ComponentCollectorManager, ComponentDakrTransport} + for _, compName := range readyComponents { + component, exists := hm.components[compName] + if !exists { + return fmt.Errorf("%s is not registered", compName) + } + if component.Status != HealthStatusHealthy && component.Status != HealthStatusDegraded { + return fmt.Errorf("%s is not ready (status: %s)", compName, component.Status) + } + } + return nil +} + +// String returns a human-readable representation of the HealthStatus +func (s HealthStatus) String() string { + switch s { + case HealthStatusHealthy: + return "healthy" + case HealthStatusDegraded: + return "degraded" + case HealthStatusUnhealthy: + return "unhealthy" + default: + return "unspecified" + } +} diff --git a/internal/health/manager_test.go b/internal/health/manager_test.go index e8c5225e..0ef53340 100644 --- a/internal/health/manager_test.go +++ b/internal/health/manager_test.go @@ -97,3 +97,104 @@ func TestConcurrentUpdatesAndReads(t *testing.T) { assert.Contains(t, statuses, status.Status) assert.Contains(t, messages, status.Message) } + +// Test case for HealthStatus String method +func TestHealthStatus_String(t *testing.T) { + tests := []struct { + status HealthStatus + expected string + }{ + {HealthStatusUnspecified, "unspecified"}, + {HealthStatusHealthy, "healthy"}, + {HealthStatusDegraded, "degraded"}, + {HealthStatusUnhealthy, "unhealthy"}, + {HealthStatus(99), "unspecified"}, + } + for _, tt := range tests { + assert.Equal(t, tt.expected, tt.status.String()) + } +} + +/* +Readiness and Healthiness tests +*/func TestLivenessCheck_Healthy(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "all good", nil) + + err := hm.LivenessCheck(nil) + assert.NoError(t, err) +} + +func TestLivenessCheck_DegradedIsStillAlive(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusDegraded, "some issues", nil) + + err := hm.LivenessCheck(nil) + assert.NoError(t, err) +} + +func TestLivenessCheck_UnhealthyCollectorFails(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "no collectors running", nil) + + err := hm.LivenessCheck(nil) + assert.Error(t, err) + assert.Contains(t, err.Error(), "collector_manager") +} + +func TestLivenessCheck_UnspecifiedPasses(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + // Status is Unspecified (startup) — should still pass liveness + err := hm.LivenessCheck(nil) + assert.NoError(t, err) +} + +func TestReadinessCheck_AllReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) + + err := hm.ReadinessCheck(nil) + assert.NoError(t, err) +} + +func TestReadinessCheck_DegradedIsReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusDegraded, "some issues", nil) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusDegraded, "retrying", nil) + + err := hm.ReadinessCheck(nil) + assert.NoError(t, err) +} + +func TestReadinessCheck_CollectorUnspecifiedNotReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + // collector_manager still Unspecified (not started yet) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) + + err := hm.ReadinessCheck(nil) + assert.Error(t, err) + assert.Contains(t, err.Error(), "collector_manager") +} + +func TestReadinessCheck_TransportUnhealthyNotReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusUnhealthy, "cannot reach dakr", nil) + + err := hm.ReadinessCheck(nil) + assert.Error(t, err) + assert.Contains(t, err.Error(), "dakr_transport") +} diff --git a/internal/health/server.go b/internal/health/server.go new file mode 100644 index 00000000..13ec151d --- /dev/null +++ b/internal/health/server.go @@ -0,0 +1,119 @@ +package health + +import ( + "context" + "encoding/json" + "net" + "net/http" + "time" +) + +// HealthResponse represents the JSON structuure for /healthz and /readyz responses +type HealthResponse struct { + Status string `json:"status"` + Error string `json:"message,omitempty"` + Components map[string]ComponentResponse `json:"components,omitempty"` +} + +// ComponentResponse represents the JSON structure for /components/{component} responses +type ComponentResponse struct { + Status string `json:"status"` + Message string `json:"message,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +// HealthServer serves health and readiness endpoints +type HealthServer struct { + manager *HealthManager + addr string + server *http.Server +} + +// NewHealthServer creates a new HealthServer bound to the specified address +func NewHealthServer(manager *HealthManager, addr string) *HealthServer { + s := &HealthServer{ + manager: manager, + addr: addr, + } + mux := http.NewServeMux() + mux.HandleFunc("/healthz", s.healthzHandler) + mux.HandleFunc("/readyz", s.readyzHandler) + s.server = &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + return s +} + +// Start begins serving health endpoints +func (s *HealthServer) Start() error { + ln, err := net.Listen("tcp", s.addr) + if err != nil { + return err + } + go s.server.Serve(ln) + return nil +} + +// Stop gracefully shuts down the server +func (s *HealthServer) Stop(ctx context.Context) error { + return s.server.Shutdown(ctx) +} + +// healthzHandler handles the /healthz endpoint +func (s *HealthServer) healthzHandler(w http.ResponseWriter, r *http.Request) { + report := s.manager.BuildReport() + err := s.manager.LivenessCheck(r) + + response := HealthResponse{ + Components: buildComponentResponses(report), + } + if err != nil { + response.Status = "unhealthy" + response.Error = err.Error() + writeJSON(w, http.StatusServiceUnavailable, response) + return + } + response.Status = "healthy" + writeJSON(w, http.StatusOK, response) + +} + +// readyzHandler handles the /readyz endpoint +func (s *HealthServer) readyzHandler(w http.ResponseWriter, r *http.Request) { + report := s.manager.BuildReport() + err := s.manager.ReadinessCheck(r) + + response := HealthResponse{ + Components: buildComponentResponses(report), + } + if err != nil { + response.Status = "not ready" + response.Error = err.Error() + writeJSON(w, http.StatusServiceUnavailable, response) + return + } + response.Status = "ready" + writeJSON(w, http.StatusOK, response) +} + +// buildComponentResponses converts the health report into a map of component responses +func buildComponentResponses(report map[string]ComponentStatus) map[string]ComponentResponse { + components := make(map[string]ComponentResponse, len(report)) + for name, status := range report { + components[name] = ComponentResponse{ + Status: status.Status.String(), + Message: status.Message, + Metadata: status.Metadata, + } + } + return components +} + +// writeJSON writes the response as JSON +func writeJSON(w http.ResponseWriter, statusCode int, data any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + json.NewEncoder(w).Encode(data) +} diff --git a/internal/health/server_test.go b/internal/health/server_test.go new file mode 100644 index 00000000..6d950fcd --- /dev/null +++ b/internal/health/server_test.go @@ -0,0 +1,108 @@ +package health + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestHealthzHandler_Healthy(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/healthz", nil) + w := httptest.NewRecorder() + + srv.healthzHandler(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Equal(t, "healthy", resp.Status) +} + +func TestHealthzHandler_Unhealthy(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "no collectors running", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/healthz", nil) + w := httptest.NewRecorder() + + srv.healthzHandler(w, req) + + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Equal(t, "unhealthy", resp.Status) + assert.Contains(t, resp.Error, "collector_manager") +} + +func TestReadyzHandler_Ready(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/readyz", nil) + w := httptest.NewRecorder() + + srv.readyzHandler(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Equal(t, "ready", resp.Status) +} + +func TestReadyzHandler_NotReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + // collector_manager still Unspecified + hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/readyz", nil) + w := httptest.NewRecorder() + + srv.readyzHandler(w, req) + + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Equal(t, "not ready", resp.Status) +} + +func TestHealthzHandler_IncludesComponents(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "55/60 active", map[string]string{"active": "55"}) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusDegraded, "retrying", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/healthz", nil) + w := httptest.NewRecorder() + + srv.healthzHandler(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Len(t, resp.Components, 2) +} From f94cf1b996b80e4417e804c6dc90a0e8b70bb6c7 Mon Sep 17 00:00:00 2001 From: Antonio Nesic Date: Mon, 16 Feb 2026 11:23:20 +0100 Subject: [PATCH 2/5] feat(health): add graceful shutdown and remove dead code from health server wiring --- cmd/main.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 6f01b947..c32fe1a1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "crypto/tls" "flag" "os" @@ -38,6 +39,7 @@ import ( monitoringv1 "github.com/devzero-inc/zxporter/api/v1" "github.com/devzero-inc/zxporter/internal/controller" + // +kubebuilder:scaffold:imports "github.com/devzero-inc/zxporter/internal/health" ) @@ -168,20 +170,17 @@ func main() { setupLog.Error(err, "unable to start health server") os.Exit(1) } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := healthServer.Stop(ctx); err != nil { + setupLog.Error(err, "error stopping health server") + } + }() setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } - - // TODO when we make sure that the custom health server works, we can remove the default health probe server and the related code below - // if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - // setupLog.Error(err, "unable to set up health check") - // os.Exit(1) - // } - // if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - // setupLog.Error(err, "unable to set up ready check") - // os.Exit(1) - // } } From 8ea608e2b345bb06d9d69d82143eeb1b1a9be871 Mon Sep 17 00:00:00 2001 From: Antonio Nesic Date: Mon, 16 Feb 2026 11:35:52 +0100 Subject: [PATCH 3/5] Golint fixes --- .gitignore | 3 ++- cmd/main.go | 2 +- internal/health/server.go | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index e11163c3..0f35c4cb 100644 --- a/.gitignore +++ b/.gitignore @@ -42,4 +42,5 @@ debug_rendered.yaml #ignore dev helm deployment helm-chart/zxporter/local_values.yaml -docs/plans \ No newline at end of file +docs/plans +.DS_Store diff --git a/cmd/main.go b/cmd/main.go index c32fe1a1..c3e4ff58 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -127,7 +127,7 @@ func main() { Scheme: scheme, Metrics: metricsServerOptions, WebhookServer: webhookServer, - HealthProbeBindAddress: "", // Disable the default health probe server since we are using a custom one in 'internal/health' + HealthProbeBindAddress: "", // Custom health server used instead LeaderElection: enableLeaderElection, LeaderElectionID: "055ced15.devzero.io", // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily diff --git a/internal/health/server.go b/internal/health/server.go index 13ec151d..e14ccf06 100644 --- a/internal/health/server.go +++ b/internal/health/server.go @@ -52,7 +52,7 @@ func (s *HealthServer) Start() error { if err != nil { return err } - go s.server.Serve(ln) + go s.server.Serve(ln) //nolint:errcheck // server always returns error on shutdown return nil } @@ -115,5 +115,5 @@ func buildComponentResponses(report map[string]ComponentStatus) map[string]Compo func writeJSON(w http.ResponseWriter, statusCode int, data any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) - json.NewEncoder(w).Encode(data) + _ = json.NewEncoder(w).Encode(data) } From c2cd9ac1a8701f484c27fe1354ccd0ade4be54be Mon Sep 17 00:00:00 2001 From: Antonio Nesic Date: Wed, 18 Feb 2026 13:37:37 +0100 Subject: [PATCH 4/5] fix(health): add liveness suppression during planned collector restarts Addresses reviewer concern about constant backoffs when collectors are restarting. StopAll() sets collector_manager to Unhealthy, which fails the liveness probe and triggers pod kills. This adds a grace period mechanism: planned restarts suppress liveness checks for up to 5 minutes, and StartAll() clears the suppression immediately. Unplanned failures (cleanupOnFailure) intentionally do NOT suppress, so the pod still restarts on truly fatal states. --- internal/collector/manager.go | 3 + .../controller/collectionpolicy_controller.go | 11 ++++ internal/health/manager.go | 34 +++++++++- internal/health/manager_test.go | 65 +++++++++++++++++++ 4 files changed, 110 insertions(+), 3 deletions(-) diff --git a/internal/collector/manager.go b/internal/collector/manager.go index d70e35bb..ec03484f 100644 --- a/internal/collector/manager.go +++ b/internal/collector/manager.go @@ -250,6 +250,9 @@ func (m *CollectionManager) StartAll(ctx context.Context) error { } m.started = true + if m.healthManager != nil { + m.healthManager.ClearLivenessSuppression() + } m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusHealthy, fmt.Sprintf("%d collectors started", len(m.collectors)), map[string]string{"collector_count": fmt.Sprintf("%d", len(m.collectors))}) m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusHealthy, "Buffer is operational", map[string]string{"capacity": fmt.Sprintf("%d", m.bufferSize)}) diff --git a/internal/controller/collectionpolicy_controller.go b/internal/controller/collectionpolicy_controller.go index c7603357..08e10988 100644 --- a/internal/controller/collectionpolicy_controller.go +++ b/internal/controller/collectionpolicy_controller.go @@ -986,6 +986,12 @@ func (r *CollectionPolicyReconciler) restartCollectors(ctx context.Context, newC }, ) + // Suppress liveness during planned restart so the transient Unhealthy + // window between StopAll and StartAll does not trigger a pod kill. + if r.HealthManager != nil { + r.HealthManager.SuppressLiveness(5 * time.Minute) + } + // Stop all existing collectors if r.CollectionManager != nil { logger.Info("Stopping all collectors") @@ -2146,6 +2152,11 @@ func (r *CollectionPolicyReconciler) setupAllCollectors( ) error { logger.Info("Now registering and starting all collectors") + // Suppress liveness during reconfiguration restart + if r.HealthManager != nil { + r.HealthManager.SuppressLiveness(5 * time.Minute) + } + // Stop the collection manager to reconfigure with all collectors if err := r.CollectionManager.StopAll(); err != nil { r.TelemetryLogger.Report( diff --git a/internal/health/manager.go b/internal/health/manager.go index 0a70b65c..104f6111 100644 --- a/internal/health/manager.go +++ b/internal/health/manager.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "sync" + "time" ) // HealthStatus matches proto enum for easy mapping @@ -25,8 +26,9 @@ type ComponentStatus struct { } type HealthManager struct { - mu sync.RWMutex - components map[string]*ComponentStatus + mu sync.RWMutex + components map[string]*ComponentStatus + livenessGraceUntil time.Time // LivenessCheck always passes before this deadline } // NewHealthManager creates a new HealthManager @@ -111,11 +113,37 @@ func (hm *HealthManager) BuildReport() map[string]ComponentStatus { return report } -// LivenessCheck checks if all components are at least Degraded (not Unhealthy) +// SuppressLiveness makes LivenessCheck pass unconditionally for the given +// duration. Use this before a planned collector restart so that the transient +// Unhealthy window does not trigger a pod kill. The grace period is cleared +// automatically when StartAll succeeds (via ClearLivenessSuppression) or when +// the deadline expires. +func (hm *HealthManager) SuppressLiveness(d time.Duration) { + hm.mu.Lock() + defer hm.mu.Unlock() + hm.livenessGraceUntil = time.Now().Add(d) +} + +// ClearLivenessSuppression removes any active grace period so LivenessCheck +// resumes normal evaluation. Call this after collectors are back up. +func (hm *HealthManager) ClearLivenessSuppression() { + hm.mu.Lock() + defer hm.mu.Unlock() + hm.livenessGraceUntil = time.Time{} +} + +// LivenessCheck checks if all components are at least Degraded (not Unhealthy). +// During an active grace period (set via SuppressLiveness) it always returns nil +// so that planned restarts do not trigger pod kills. func (hm *HealthManager) LivenessCheck(_ *http.Request) error { hm.mu.RLock() defer hm.mu.RUnlock() + // During a grace period, unconditionally pass liveness + if !hm.livenessGraceUntil.IsZero() && time.Now().Before(hm.livenessGraceUntil) { + return nil + } + component, exists := hm.components[ComponentCollectorManager] if exists && component.Status == HealthStatusUnhealthy { return fmt.Errorf("%s is %s: %s", ComponentCollectorManager, component.Status, component.Message) diff --git a/internal/health/manager_test.go b/internal/health/manager_test.go index 0ef53340..ca11d2be 100644 --- a/internal/health/manager_test.go +++ b/internal/health/manager_test.go @@ -4,6 +4,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -198,3 +199,67 @@ func TestReadinessCheck_TransportUnhealthyNotReady(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "dakr_transport") } + +// Liveness suppression tests — ensures planned restarts don't cause pod kills + +func TestLivenessCheck_SuppressedDuringGracePeriod(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped for restart", nil) + + // Without suppression, liveness should fail + err := hm.LivenessCheck(nil) + assert.Error(t, err) + + // Suppress liveness for planned restart + hm.SuppressLiveness(5 * time.Minute) + + // Now liveness should pass even though collector_manager is Unhealthy + err = hm.LivenessCheck(nil) + assert.NoError(t, err) +} + +func TestLivenessCheck_ClearSuppression(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped", nil) + + hm.SuppressLiveness(5 * time.Minute) + assert.NoError(t, hm.LivenessCheck(nil)) // suppressed + + // Clear suppression — should fail again since still Unhealthy + hm.ClearLivenessSuppression() + assert.Error(t, hm.LivenessCheck(nil)) +} + +func TestLivenessCheck_GracePeriodExpires(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped", nil) + + // Set a grace period that is already expired + hm.mu.Lock() + hm.livenessGraceUntil = time.Now().Add(-1 * time.Second) + hm.mu.Unlock() + + // Should fail because grace period has expired + err := hm.LivenessCheck(nil) + assert.Error(t, err) +} + +func TestLivenessCheck_FullRestartCycle(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + assert.NoError(t, hm.LivenessCheck(nil)) + + // Simulate planned restart: suppress, then set unhealthy + hm.SuppressLiveness(5 * time.Minute) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped for restart", nil) + assert.NoError(t, hm.LivenessCheck(nil)) // still passes + + // Simulate collectors coming back: set healthy and clear suppression + hm.ClearLivenessSuppression() + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "restarted", nil) + assert.NoError(t, hm.LivenessCheck(nil)) // passes normally +} From 77b7da84cc2b1aab88a337b8c28b5ca476ab3406 Mon Sep 17 00:00:00 2001 From: Antonio Nesic Date: Wed, 18 Feb 2026 13:51:19 +0100 Subject: [PATCH 5/5] fix(health): address code review findings - TOCTOU fix: add CheckLiveness/CheckReadiness that atomically build report and evaluate check under a single lock, replacing separate BuildReport + LivenessCheck calls in HTTP handlers - Remove unused *http.Request param from LivenessCheck/ReadinessCheck to decouple health logic from net/http transport - Fix JSON tag: rename Error field from "message" to "error" to avoid confusion with ComponentResponse.Message --- internal/health/manager.go | 48 +++++++++++++++++++++++++++++---- internal/health/manager_test.go | 32 +++++++++++----------- internal/health/server.go | 15 +++++------ 3 files changed, 65 insertions(+), 30 deletions(-) diff --git a/internal/health/manager.go b/internal/health/manager.go index 104f6111..84efad7e 100644 --- a/internal/health/manager.go +++ b/internal/health/manager.go @@ -2,7 +2,6 @@ package health import ( "fmt" - "net/http" "sync" "time" ) @@ -135,11 +134,14 @@ func (hm *HealthManager) ClearLivenessSuppression() { // LivenessCheck checks if all components are at least Degraded (not Unhealthy). // During an active grace period (set via SuppressLiveness) it always returns nil // so that planned restarts do not trigger pod kills. -func (hm *HealthManager) LivenessCheck(_ *http.Request) error { +func (hm *HealthManager) LivenessCheck() error { hm.mu.RLock() defer hm.mu.RUnlock() + return hm.livenessCheckLocked() +} - // During a grace period, unconditionally pass liveness +// livenessCheckLocked performs the liveness check while the caller holds mu. +func (hm *HealthManager) livenessCheckLocked() error { if !hm.livenessGraceUntil.IsZero() && time.Now().Before(hm.livenessGraceUntil) { return nil } @@ -152,11 +154,15 @@ func (hm *HealthManager) LivenessCheck(_ *http.Request) error { return nil } -// ReadinessCheck checks if all components are Healthy -func (hm *HealthManager) ReadinessCheck(_ *http.Request) error { +// ReadinessCheck checks if all required components are Healthy or Degraded. +func (hm *HealthManager) ReadinessCheck() error { hm.mu.RLock() defer hm.mu.RUnlock() + return hm.readinessCheckLocked() +} +// readinessCheckLocked performs the readiness check while the caller holds mu. +func (hm *HealthManager) readinessCheckLocked() error { readyComponents := []string{ComponentCollectorManager, ComponentDakrTransport} for _, compName := range readyComponents { component, exists := hm.components[compName] @@ -170,6 +176,38 @@ func (hm *HealthManager) ReadinessCheck(_ *http.Request) error { return nil } +// CheckLiveness returns the report and liveness error atomically under a single +// lock acquisition, avoiding TOCTOU between BuildReport and LivenessCheck. +func (hm *HealthManager) CheckLiveness() (map[string]ComponentStatus, error) { + hm.mu.RLock() + defer hm.mu.RUnlock() + return hm.buildReportLocked(), hm.livenessCheckLocked() +} + +// CheckReadiness returns the report and readiness error atomically. +func (hm *HealthManager) CheckReadiness() (map[string]ComponentStatus, error) { + hm.mu.RLock() + defer hm.mu.RUnlock() + return hm.buildReportLocked(), hm.readinessCheckLocked() +} + +// buildReportLocked builds the report while the caller holds mu. +func (hm *HealthManager) buildReportLocked() map[string]ComponentStatus { + report := make(map[string]ComponentStatus, len(hm.components)) + for name, comp := range hm.components { + meta := make(map[string]string, len(comp.Metadata)) + for k, v := range comp.Metadata { + meta[k] = v + } + report[name] = ComponentStatus{ + Status: comp.Status, + Message: comp.Message, + Metadata: meta, + } + } + return report +} + // String returns a human-readable representation of the HealthStatus func (s HealthStatus) String() string { switch s { diff --git a/internal/health/manager_test.go b/internal/health/manager_test.go index ca11d2be..b9a47593 100644 --- a/internal/health/manager_test.go +++ b/internal/health/manager_test.go @@ -123,7 +123,7 @@ Readiness and Healthiness tests hm.Register(ComponentCollectorManager) hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "all good", nil) - err := hm.LivenessCheck(nil) + err := hm.LivenessCheck() assert.NoError(t, err) } @@ -132,7 +132,7 @@ func TestLivenessCheck_DegradedIsStillAlive(t *testing.T) { hm.Register(ComponentCollectorManager) hm.UpdateStatus(ComponentCollectorManager, HealthStatusDegraded, "some issues", nil) - err := hm.LivenessCheck(nil) + err := hm.LivenessCheck() assert.NoError(t, err) } @@ -141,7 +141,7 @@ func TestLivenessCheck_UnhealthyCollectorFails(t *testing.T) { hm.Register(ComponentCollectorManager) hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "no collectors running", nil) - err := hm.LivenessCheck(nil) + err := hm.LivenessCheck() assert.Error(t, err) assert.Contains(t, err.Error(), "collector_manager") } @@ -150,7 +150,7 @@ func TestLivenessCheck_UnspecifiedPasses(t *testing.T) { hm := NewHealthManager() hm.Register(ComponentCollectorManager) // Status is Unspecified (startup) — should still pass liveness - err := hm.LivenessCheck(nil) + err := hm.LivenessCheck() assert.NoError(t, err) } @@ -161,7 +161,7 @@ func TestReadinessCheck_AllReady(t *testing.T) { hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) - err := hm.ReadinessCheck(nil) + err := hm.ReadinessCheck() assert.NoError(t, err) } @@ -172,7 +172,7 @@ func TestReadinessCheck_DegradedIsReady(t *testing.T) { hm.UpdateStatus(ComponentCollectorManager, HealthStatusDegraded, "some issues", nil) hm.UpdateStatus(ComponentDakrTransport, HealthStatusDegraded, "retrying", nil) - err := hm.ReadinessCheck(nil) + err := hm.ReadinessCheck() assert.NoError(t, err) } @@ -183,7 +183,7 @@ func TestReadinessCheck_CollectorUnspecifiedNotReady(t *testing.T) { // collector_manager still Unspecified (not started yet) hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) - err := hm.ReadinessCheck(nil) + err := hm.ReadinessCheck() assert.Error(t, err) assert.Contains(t, err.Error(), "collector_manager") } @@ -195,7 +195,7 @@ func TestReadinessCheck_TransportUnhealthyNotReady(t *testing.T) { hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) hm.UpdateStatus(ComponentDakrTransport, HealthStatusUnhealthy, "cannot reach dakr", nil) - err := hm.ReadinessCheck(nil) + err := hm.ReadinessCheck() assert.Error(t, err) assert.Contains(t, err.Error(), "dakr_transport") } @@ -208,14 +208,14 @@ func TestLivenessCheck_SuppressedDuringGracePeriod(t *testing.T) { hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped for restart", nil) // Without suppression, liveness should fail - err := hm.LivenessCheck(nil) + err := hm.LivenessCheck() assert.Error(t, err) // Suppress liveness for planned restart hm.SuppressLiveness(5 * time.Minute) // Now liveness should pass even though collector_manager is Unhealthy - err = hm.LivenessCheck(nil) + err = hm.LivenessCheck() assert.NoError(t, err) } @@ -225,11 +225,11 @@ func TestLivenessCheck_ClearSuppression(t *testing.T) { hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped", nil) hm.SuppressLiveness(5 * time.Minute) - assert.NoError(t, hm.LivenessCheck(nil)) // suppressed + assert.NoError(t, hm.LivenessCheck()) // suppressed // Clear suppression — should fail again since still Unhealthy hm.ClearLivenessSuppression() - assert.Error(t, hm.LivenessCheck(nil)) + assert.Error(t, hm.LivenessCheck()) } func TestLivenessCheck_GracePeriodExpires(t *testing.T) { @@ -243,7 +243,7 @@ func TestLivenessCheck_GracePeriodExpires(t *testing.T) { hm.mu.Unlock() // Should fail because grace period has expired - err := hm.LivenessCheck(nil) + err := hm.LivenessCheck() assert.Error(t, err) } @@ -251,15 +251,15 @@ func TestLivenessCheck_FullRestartCycle(t *testing.T) { hm := NewHealthManager() hm.Register(ComponentCollectorManager) hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) - assert.NoError(t, hm.LivenessCheck(nil)) + assert.NoError(t, hm.LivenessCheck()) // Simulate planned restart: suppress, then set unhealthy hm.SuppressLiveness(5 * time.Minute) hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped for restart", nil) - assert.NoError(t, hm.LivenessCheck(nil)) // still passes + assert.NoError(t, hm.LivenessCheck()) // still passes // Simulate collectors coming back: set healthy and clear suppression hm.ClearLivenessSuppression() hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "restarted", nil) - assert.NoError(t, hm.LivenessCheck(nil)) // passes normally + assert.NoError(t, hm.LivenessCheck()) // passes normally } diff --git a/internal/health/server.go b/internal/health/server.go index e14ccf06..39e7e62b 100644 --- a/internal/health/server.go +++ b/internal/health/server.go @@ -8,10 +8,10 @@ import ( "time" ) -// HealthResponse represents the JSON structuure for /healthz and /readyz responses +// HealthResponse represents the JSON structure for /healthz and /readyz responses type HealthResponse struct { Status string `json:"status"` - Error string `json:"message,omitempty"` + Error string `json:"error,omitempty"` Components map[string]ComponentResponse `json:"components,omitempty"` } @@ -62,9 +62,8 @@ func (s *HealthServer) Stop(ctx context.Context) error { } // healthzHandler handles the /healthz endpoint -func (s *HealthServer) healthzHandler(w http.ResponseWriter, r *http.Request) { - report := s.manager.BuildReport() - err := s.manager.LivenessCheck(r) +func (s *HealthServer) healthzHandler(w http.ResponseWriter, _ *http.Request) { + report, err := s.manager.CheckLiveness() response := HealthResponse{ Components: buildComponentResponses(report), @@ -77,13 +76,11 @@ func (s *HealthServer) healthzHandler(w http.ResponseWriter, r *http.Request) { } response.Status = "healthy" writeJSON(w, http.StatusOK, response) - } // readyzHandler handles the /readyz endpoint -func (s *HealthServer) readyzHandler(w http.ResponseWriter, r *http.Request) { - report := s.manager.BuildReport() - err := s.manager.ReadinessCheck(r) +func (s *HealthServer) readyzHandler(w http.ResponseWriter, _ *http.Request) { + report, err := s.manager.CheckReadiness() response := HealthResponse{ Components: buildComponentResponses(report),