Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ debug_rendered.yaml
#ignore dev helm deployment
helm-chart/zxporter/local_values.yaml

docs/plans
docs/plans
.DS_Store
23 changes: 15 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"crypto/tls"
"flag"
"os"
Expand All @@ -31,15 +32,16 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"

monitoringv1 "github.com/devzero-inc/zxporter/api/v1"
"github.com/devzero-inc/zxporter/internal/controller"

// +kubebuilder:scaffold:imports
"github.com/devzero-inc/zxporter/internal/health"
)

var (
Expand Down Expand Up @@ -125,7 +127,7 @@ func main() {
Scheme: scheme,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
HealthProbeBindAddress: "", // Custom health server used instead
LeaderElection: enableLeaderElection,
LeaderElectionID: "055ced15.devzero.io",
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
Expand Down Expand Up @@ -162,14 +164,19 @@ func main() {
// No need to add the standard controller with kubebuilder:scaffold:builder
// The env-based controller doesn't rely on CRDs

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
// New health server from health package
healthServer := health.NewHealthServer(envController.Reconciler.HealthManager, probeAddr)
Comment thread
mutantkeyboard marked this conversation as resolved.
if err := healthServer.Start(); err != nil {
setupLog.Error(err, "unable to start health server")
os.Exit(1)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := healthServer.Stop(ctx); err != nil {
setupLog.Error(err, "error stopping health server")
}
}()

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/collector/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ func (m *CollectionManager) StartAll(ctx context.Context) error {
}

m.started = true
if m.healthManager != nil {
m.healthManager.ClearLivenessSuppression()
}
m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusHealthy, fmt.Sprintf("%d collectors started", len(m.collectors)), map[string]string{"collector_count": fmt.Sprintf("%d", len(m.collectors))})
m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusHealthy, "Buffer is operational", map[string]string{"capacity": fmt.Sprintf("%d", m.bufferSize)})

Expand Down
11 changes: 11 additions & 0 deletions internal/controller/collectionpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,12 @@ func (r *CollectionPolicyReconciler) restartCollectors(ctx context.Context, newC
},
)

// Suppress liveness during planned restart so the transient Unhealthy
// window between StopAll and StartAll does not trigger a pod kill.
if r.HealthManager != nil {
r.HealthManager.SuppressLiveness(5 * time.Minute)
}

// Stop all existing collectors
if r.CollectionManager != nil {
logger.Info("Stopping all collectors")
Expand Down Expand Up @@ -2146,6 +2152,11 @@ func (r *CollectionPolicyReconciler) setupAllCollectors(
) error {
logger.Info("Now registering and starting all collectors")

// Suppress liveness during reconfiguration restart
if r.HealthManager != nil {
r.HealthManager.SuppressLiveness(5 * time.Minute)
}

// Stop the collection manager to reconfigure with all collectors
if err := r.CollectionManager.StopAll(); err != nil {
r.TelemetryLogger.Report(
Expand Down
117 changes: 115 additions & 2 deletions internal/health/manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package health

import (
"fmt"
"sync"
"time"
)

// HealthStatus matches proto enum for easy mapping
Expand All @@ -23,8 +25,9 @@ type ComponentStatus struct {
}

type HealthManager struct {
mu sync.RWMutex
components map[string]*ComponentStatus
mu sync.RWMutex
components map[string]*ComponentStatus
livenessGraceUntil time.Time // LivenessCheck always passes before this deadline
}

// NewHealthManager creates a new HealthManager
Expand Down Expand Up @@ -108,3 +111,113 @@ func (hm *HealthManager) BuildReport() map[string]ComponentStatus {
}
return report
}

Comment thread
mutantkeyboard marked this conversation as resolved.
// SuppressLiveness makes LivenessCheck pass unconditionally for the given
// duration. Use this before a planned collector restart so that the transient
// Unhealthy window does not trigger a pod kill. The grace period is cleared
// automatically when StartAll succeeds (via ClearLivenessSuppression) or when
// the deadline expires.
func (hm *HealthManager) SuppressLiveness(d time.Duration) {
hm.mu.Lock()
defer hm.mu.Unlock()
hm.livenessGraceUntil = time.Now().Add(d)
}

// ClearLivenessSuppression removes any active grace period so LivenessCheck
// resumes normal evaluation. Call this after collectors are back up.
func (hm *HealthManager) ClearLivenessSuppression() {
hm.mu.Lock()
defer hm.mu.Unlock()
hm.livenessGraceUntil = time.Time{}
}

// LivenessCheck checks if all components are at least Degraded (not Unhealthy).
// During an active grace period (set via SuppressLiveness) it always returns nil
// so that planned restarts do not trigger pod kills.
func (hm *HealthManager) LivenessCheck() error {
hm.mu.RLock()
defer hm.mu.RUnlock()
return hm.livenessCheckLocked()
}

// livenessCheckLocked performs the liveness check while the caller holds mu.
func (hm *HealthManager) livenessCheckLocked() error {
if !hm.livenessGraceUntil.IsZero() && time.Now().Before(hm.livenessGraceUntil) {
return nil
}

component, exists := hm.components[ComponentCollectorManager]
if exists && component.Status == HealthStatusUnhealthy {
return fmt.Errorf("%s is %s: %s", ComponentCollectorManager, component.Status, component.Message)
}

return nil
}

// ReadinessCheck checks if all required components are Healthy or Degraded.
func (hm *HealthManager) ReadinessCheck() error {
hm.mu.RLock()
defer hm.mu.RUnlock()
return hm.readinessCheckLocked()
}

// readinessCheckLocked performs the readiness check while the caller holds mu.
func (hm *HealthManager) readinessCheckLocked() error {
readyComponents := []string{ComponentCollectorManager, ComponentDakrTransport}
for _, compName := range readyComponents {
component, exists := hm.components[compName]
if !exists {
return fmt.Errorf("%s is not registered", compName)
}
if component.Status != HealthStatusHealthy && component.Status != HealthStatusDegraded {
return fmt.Errorf("%s is not ready (status: %s)", compName, component.Status)
}
}
return nil
}

// CheckLiveness returns the report and liveness error atomically under a single
// lock acquisition, avoiding TOCTOU between BuildReport and LivenessCheck.
func (hm *HealthManager) CheckLiveness() (map[string]ComponentStatus, error) {
hm.mu.RLock()
defer hm.mu.RUnlock()
return hm.buildReportLocked(), hm.livenessCheckLocked()
}

// CheckReadiness returns the report and readiness error atomically.
func (hm *HealthManager) CheckReadiness() (map[string]ComponentStatus, error) {
hm.mu.RLock()
defer hm.mu.RUnlock()
return hm.buildReportLocked(), hm.readinessCheckLocked()
}

// buildReportLocked builds the report while the caller holds mu.
func (hm *HealthManager) buildReportLocked() map[string]ComponentStatus {
report := make(map[string]ComponentStatus, len(hm.components))
for name, comp := range hm.components {
meta := make(map[string]string, len(comp.Metadata))
for k, v := range comp.Metadata {
meta[k] = v
}
report[name] = ComponentStatus{
Status: comp.Status,
Message: comp.Message,
Metadata: meta,
}
}
return report
}

// String returns a human-readable representation of the HealthStatus
func (s HealthStatus) String() string {
switch s {
case HealthStatusHealthy:
return "healthy"
case HealthStatusDegraded:
return "degraded"
case HealthStatusUnhealthy:
return "unhealthy"
default:
return "unspecified"
}
}
Loading
Loading