diff --git a/.gitignore b/.gitignore index e11163c3..0f35c4cb 100644 --- a/.gitignore +++ b/.gitignore @@ -42,4 +42,5 @@ debug_rendered.yaml #ignore dev helm deployment helm-chart/zxporter/local_values.yaml -docs/plans \ No newline at end of file +docs/plans +.DS_Store diff --git a/cmd/main.go b/cmd/main.go index 8166de3e..c3e4ff58 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "crypto/tls" "flag" "os" @@ -31,7 +32,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -39,7 +39,9 @@ import ( monitoringv1 "github.com/devzero-inc/zxporter/api/v1" "github.com/devzero-inc/zxporter/internal/controller" + // +kubebuilder:scaffold:imports + "github.com/devzero-inc/zxporter/internal/health" ) var ( @@ -125,7 +127,7 @@ func main() { Scheme: scheme, Metrics: metricsServerOptions, WebhookServer: webhookServer, - HealthProbeBindAddress: probeAddr, + HealthProbeBindAddress: "", // Custom health server used instead LeaderElection: enableLeaderElection, LeaderElectionID: "055ced15.devzero.io", // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily @@ -162,14 +164,19 @@ func main() { // No need to add the standard controller with kubebuilder:scaffold:builder // The env-based controller doesn't rely on CRDs - if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") - os.Exit(1) - } - if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") + // New health server from health package + healthServer := health.NewHealthServer(envController.Reconciler.HealthManager, probeAddr) + if err := healthServer.Start(); err != nil { + setupLog.Error(err, "unable to start health server") os.Exit(1) } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := healthServer.Stop(ctx); err != nil { + setupLog.Error(err, "error stopping health server") + } + }() setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/internal/collector/manager.go b/internal/collector/manager.go index d70e35bb..ec03484f 100644 --- a/internal/collector/manager.go +++ b/internal/collector/manager.go @@ -250,6 +250,9 @@ func (m *CollectionManager) StartAll(ctx context.Context) error { } m.started = true + if m.healthManager != nil { + m.healthManager.ClearLivenessSuppression() + } m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusHealthy, fmt.Sprintf("%d collectors started", len(m.collectors)), map[string]string{"collector_count": fmt.Sprintf("%d", len(m.collectors))}) m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusHealthy, "Buffer is operational", map[string]string{"capacity": fmt.Sprintf("%d", m.bufferSize)}) diff --git a/internal/controller/collectionpolicy_controller.go b/internal/controller/collectionpolicy_controller.go index c7603357..08e10988 100644 --- a/internal/controller/collectionpolicy_controller.go +++ b/internal/controller/collectionpolicy_controller.go @@ -986,6 +986,12 @@ func (r *CollectionPolicyReconciler) restartCollectors(ctx context.Context, newC }, ) + // Suppress liveness during planned restart so the transient Unhealthy + // window between StopAll and StartAll does not trigger a pod kill. + if r.HealthManager != nil { + r.HealthManager.SuppressLiveness(5 * time.Minute) + } + // Stop all existing collectors if r.CollectionManager != nil { logger.Info("Stopping all collectors") @@ -2146,6 +2152,11 @@ func (r *CollectionPolicyReconciler) setupAllCollectors( ) error { logger.Info("Now registering and starting all collectors") + // Suppress liveness during reconfiguration restart + if r.HealthManager != nil { + r.HealthManager.SuppressLiveness(5 * time.Minute) + } + // Stop the collection manager to reconfigure with all collectors if err := r.CollectionManager.StopAll(); err != nil { r.TelemetryLogger.Report( diff --git a/internal/health/manager.go b/internal/health/manager.go index 30f264e9..84efad7e 100644 --- a/internal/health/manager.go +++ b/internal/health/manager.go @@ -1,7 +1,9 @@ package health import ( + "fmt" "sync" + "time" ) // HealthStatus matches proto enum for easy mapping @@ -23,8 +25,9 @@ type ComponentStatus struct { } type HealthManager struct { - mu sync.RWMutex - components map[string]*ComponentStatus + mu sync.RWMutex + components map[string]*ComponentStatus + livenessGraceUntil time.Time // LivenessCheck always passes before this deadline } // NewHealthManager creates a new HealthManager @@ -108,3 +111,113 @@ func (hm *HealthManager) BuildReport() map[string]ComponentStatus { } return report } + +// SuppressLiveness makes LivenessCheck pass unconditionally for the given +// duration. Use this before a planned collector restart so that the transient +// Unhealthy window does not trigger a pod kill. The grace period is cleared +// automatically when StartAll succeeds (via ClearLivenessSuppression) or when +// the deadline expires. +func (hm *HealthManager) SuppressLiveness(d time.Duration) { + hm.mu.Lock() + defer hm.mu.Unlock() + hm.livenessGraceUntil = time.Now().Add(d) +} + +// ClearLivenessSuppression removes any active grace period so LivenessCheck +// resumes normal evaluation. Call this after collectors are back up. +func (hm *HealthManager) ClearLivenessSuppression() { + hm.mu.Lock() + defer hm.mu.Unlock() + hm.livenessGraceUntil = time.Time{} +} + +// LivenessCheck checks if all components are at least Degraded (not Unhealthy). +// During an active grace period (set via SuppressLiveness) it always returns nil +// so that planned restarts do not trigger pod kills. +func (hm *HealthManager) LivenessCheck() error { + hm.mu.RLock() + defer hm.mu.RUnlock() + return hm.livenessCheckLocked() +} + +// livenessCheckLocked performs the liveness check while the caller holds mu. +func (hm *HealthManager) livenessCheckLocked() error { + if !hm.livenessGraceUntil.IsZero() && time.Now().Before(hm.livenessGraceUntil) { + return nil + } + + component, exists := hm.components[ComponentCollectorManager] + if exists && component.Status == HealthStatusUnhealthy { + return fmt.Errorf("%s is %s: %s", ComponentCollectorManager, component.Status, component.Message) + } + + return nil +} + +// ReadinessCheck checks if all required components are Healthy or Degraded. +func (hm *HealthManager) ReadinessCheck() error { + hm.mu.RLock() + defer hm.mu.RUnlock() + return hm.readinessCheckLocked() +} + +// readinessCheckLocked performs the readiness check while the caller holds mu. +func (hm *HealthManager) readinessCheckLocked() error { + readyComponents := []string{ComponentCollectorManager, ComponentDakrTransport} + for _, compName := range readyComponents { + component, exists := hm.components[compName] + if !exists { + return fmt.Errorf("%s is not registered", compName) + } + if component.Status != HealthStatusHealthy && component.Status != HealthStatusDegraded { + return fmt.Errorf("%s is not ready (status: %s)", compName, component.Status) + } + } + return nil +} + +// CheckLiveness returns the report and liveness error atomically under a single +// lock acquisition, avoiding TOCTOU between BuildReport and LivenessCheck. +func (hm *HealthManager) CheckLiveness() (map[string]ComponentStatus, error) { + hm.mu.RLock() + defer hm.mu.RUnlock() + return hm.buildReportLocked(), hm.livenessCheckLocked() +} + +// CheckReadiness returns the report and readiness error atomically. +func (hm *HealthManager) CheckReadiness() (map[string]ComponentStatus, error) { + hm.mu.RLock() + defer hm.mu.RUnlock() + return hm.buildReportLocked(), hm.readinessCheckLocked() +} + +// buildReportLocked builds the report while the caller holds mu. +func (hm *HealthManager) buildReportLocked() map[string]ComponentStatus { + report := make(map[string]ComponentStatus, len(hm.components)) + for name, comp := range hm.components { + meta := make(map[string]string, len(comp.Metadata)) + for k, v := range comp.Metadata { + meta[k] = v + } + report[name] = ComponentStatus{ + Status: comp.Status, + Message: comp.Message, + Metadata: meta, + } + } + return report +} + +// String returns a human-readable representation of the HealthStatus +func (s HealthStatus) String() string { + switch s { + case HealthStatusHealthy: + return "healthy" + case HealthStatusDegraded: + return "degraded" + case HealthStatusUnhealthy: + return "unhealthy" + default: + return "unspecified" + } +} diff --git a/internal/health/manager_test.go b/internal/health/manager_test.go index e8c5225e..b9a47593 100644 --- a/internal/health/manager_test.go +++ b/internal/health/manager_test.go @@ -4,6 +4,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -97,3 +98,168 @@ func TestConcurrentUpdatesAndReads(t *testing.T) { assert.Contains(t, statuses, status.Status) assert.Contains(t, messages, status.Message) } + +// Test case for HealthStatus String method +func TestHealthStatus_String(t *testing.T) { + tests := []struct { + status HealthStatus + expected string + }{ + {HealthStatusUnspecified, "unspecified"}, + {HealthStatusHealthy, "healthy"}, + {HealthStatusDegraded, "degraded"}, + {HealthStatusUnhealthy, "unhealthy"}, + {HealthStatus(99), "unspecified"}, + } + for _, tt := range tests { + assert.Equal(t, tt.expected, tt.status.String()) + } +} + +/* +Readiness and Healthiness tests +*/func TestLivenessCheck_Healthy(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "all good", nil) + + err := hm.LivenessCheck() + assert.NoError(t, err) +} + +func TestLivenessCheck_DegradedIsStillAlive(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusDegraded, "some issues", nil) + + err := hm.LivenessCheck() + assert.NoError(t, err) +} + +func TestLivenessCheck_UnhealthyCollectorFails(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "no collectors running", nil) + + err := hm.LivenessCheck() + assert.Error(t, err) + assert.Contains(t, err.Error(), "collector_manager") +} + +func TestLivenessCheck_UnspecifiedPasses(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + // Status is Unspecified (startup) — should still pass liveness + err := hm.LivenessCheck() + assert.NoError(t, err) +} + +func TestReadinessCheck_AllReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) + + err := hm.ReadinessCheck() + assert.NoError(t, err) +} + +func TestReadinessCheck_DegradedIsReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusDegraded, "some issues", nil) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusDegraded, "retrying", nil) + + err := hm.ReadinessCheck() + assert.NoError(t, err) +} + +func TestReadinessCheck_CollectorUnspecifiedNotReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + // collector_manager still Unspecified (not started yet) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) + + err := hm.ReadinessCheck() + assert.Error(t, err) + assert.Contains(t, err.Error(), "collector_manager") +} + +func TestReadinessCheck_TransportUnhealthyNotReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusUnhealthy, "cannot reach dakr", nil) + + err := hm.ReadinessCheck() + assert.Error(t, err) + assert.Contains(t, err.Error(), "dakr_transport") +} + +// Liveness suppression tests — ensures planned restarts don't cause pod kills + +func TestLivenessCheck_SuppressedDuringGracePeriod(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped for restart", nil) + + // Without suppression, liveness should fail + err := hm.LivenessCheck() + assert.Error(t, err) + + // Suppress liveness for planned restart + hm.SuppressLiveness(5 * time.Minute) + + // Now liveness should pass even though collector_manager is Unhealthy + err = hm.LivenessCheck() + assert.NoError(t, err) +} + +func TestLivenessCheck_ClearSuppression(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped", nil) + + hm.SuppressLiveness(5 * time.Minute) + assert.NoError(t, hm.LivenessCheck()) // suppressed + + // Clear suppression — should fail again since still Unhealthy + hm.ClearLivenessSuppression() + assert.Error(t, hm.LivenessCheck()) +} + +func TestLivenessCheck_GracePeriodExpires(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped", nil) + + // Set a grace period that is already expired + hm.mu.Lock() + hm.livenessGraceUntil = time.Now().Add(-1 * time.Second) + hm.mu.Unlock() + + // Should fail because grace period has expired + err := hm.LivenessCheck() + assert.Error(t, err) +} + +func TestLivenessCheck_FullRestartCycle(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + assert.NoError(t, hm.LivenessCheck()) + + // Simulate planned restart: suppress, then set unhealthy + hm.SuppressLiveness(5 * time.Minute) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "stopped for restart", nil) + assert.NoError(t, hm.LivenessCheck()) // still passes + + // Simulate collectors coming back: set healthy and clear suppression + hm.ClearLivenessSuppression() + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "restarted", nil) + assert.NoError(t, hm.LivenessCheck()) // passes normally +} diff --git a/internal/health/server.go b/internal/health/server.go new file mode 100644 index 00000000..39e7e62b --- /dev/null +++ b/internal/health/server.go @@ -0,0 +1,116 @@ +package health + +import ( + "context" + "encoding/json" + "net" + "net/http" + "time" +) + +// HealthResponse represents the JSON structure for /healthz and /readyz responses +type HealthResponse struct { + Status string `json:"status"` + Error string `json:"error,omitempty"` + Components map[string]ComponentResponse `json:"components,omitempty"` +} + +// ComponentResponse represents the JSON structure for /components/{component} responses +type ComponentResponse struct { + Status string `json:"status"` + Message string `json:"message,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +// HealthServer serves health and readiness endpoints +type HealthServer struct { + manager *HealthManager + addr string + server *http.Server +} + +// NewHealthServer creates a new HealthServer bound to the specified address +func NewHealthServer(manager *HealthManager, addr string) *HealthServer { + s := &HealthServer{ + manager: manager, + addr: addr, + } + mux := http.NewServeMux() + mux.HandleFunc("/healthz", s.healthzHandler) + mux.HandleFunc("/readyz", s.readyzHandler) + s.server = &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + return s +} + +// Start begins serving health endpoints +func (s *HealthServer) Start() error { + ln, err := net.Listen("tcp", s.addr) + if err != nil { + return err + } + go s.server.Serve(ln) //nolint:errcheck // server always returns error on shutdown + return nil +} + +// Stop gracefully shuts down the server +func (s *HealthServer) Stop(ctx context.Context) error { + return s.server.Shutdown(ctx) +} + +// healthzHandler handles the /healthz endpoint +func (s *HealthServer) healthzHandler(w http.ResponseWriter, _ *http.Request) { + report, err := s.manager.CheckLiveness() + + response := HealthResponse{ + Components: buildComponentResponses(report), + } + if err != nil { + response.Status = "unhealthy" + response.Error = err.Error() + writeJSON(w, http.StatusServiceUnavailable, response) + return + } + response.Status = "healthy" + writeJSON(w, http.StatusOK, response) +} + +// readyzHandler handles the /readyz endpoint +func (s *HealthServer) readyzHandler(w http.ResponseWriter, _ *http.Request) { + report, err := s.manager.CheckReadiness() + + response := HealthResponse{ + Components: buildComponentResponses(report), + } + if err != nil { + response.Status = "not ready" + response.Error = err.Error() + writeJSON(w, http.StatusServiceUnavailable, response) + return + } + response.Status = "ready" + writeJSON(w, http.StatusOK, response) +} + +// buildComponentResponses converts the health report into a map of component responses +func buildComponentResponses(report map[string]ComponentStatus) map[string]ComponentResponse { + components := make(map[string]ComponentResponse, len(report)) + for name, status := range report { + components[name] = ComponentResponse{ + Status: status.Status.String(), + Message: status.Message, + Metadata: status.Metadata, + } + } + return components +} + +// writeJSON writes the response as JSON +func writeJSON(w http.ResponseWriter, statusCode int, data any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + _ = json.NewEncoder(w).Encode(data) +} diff --git a/internal/health/server_test.go b/internal/health/server_test.go new file mode 100644 index 00000000..6d950fcd --- /dev/null +++ b/internal/health/server_test.go @@ -0,0 +1,108 @@ +package health + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestHealthzHandler_Healthy(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/healthz", nil) + w := httptest.NewRecorder() + + srv.healthzHandler(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Equal(t, "healthy", resp.Status) +} + +func TestHealthzHandler_Unhealthy(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusUnhealthy, "no collectors running", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/healthz", nil) + w := httptest.NewRecorder() + + srv.healthzHandler(w, req) + + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Equal(t, "unhealthy", resp.Status) + assert.Contains(t, resp.Error, "collector_manager") +} + +func TestReadyzHandler_Ready(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "running", nil) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/readyz", nil) + w := httptest.NewRecorder() + + srv.readyzHandler(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Equal(t, "ready", resp.Status) +} + +func TestReadyzHandler_NotReady(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + // collector_manager still Unspecified + hm.UpdateStatus(ComponentDakrTransport, HealthStatusHealthy, "connected", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/readyz", nil) + w := httptest.NewRecorder() + + srv.readyzHandler(w, req) + + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Equal(t, "not ready", resp.Status) +} + +func TestHealthzHandler_IncludesComponents(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentCollectorManager) + hm.Register(ComponentDakrTransport) + hm.UpdateStatus(ComponentCollectorManager, HealthStatusHealthy, "55/60 active", map[string]string{"active": "55"}) + hm.UpdateStatus(ComponentDakrTransport, HealthStatusDegraded, "retrying", nil) + + srv := NewHealthServer(hm, ":8081") + req := httptest.NewRequest(http.MethodGet, "/healthz", nil) + w := httptest.NewRecorder() + + srv.healthzHandler(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp HealthResponse + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp)) + assert.Len(t, resp.Components, 2) +}