diff --git a/.gitignore b/.gitignore index d96d086b..e11163c3 100644 --- a/.gitignore +++ b/.gitignore @@ -40,4 +40,6 @@ debug_metrics.json debug_rendered.yaml #ignore dev helm deployment -helm-chart/zxporter/local_values.yaml \ No newline at end of file +helm-chart/zxporter/local_values.yaml + +docs/plans \ No newline at end of file diff --git a/Makefile b/Makefile index 5da932e2..7bc08bb9 100644 --- a/Makefile +++ b/Makefile @@ -829,3 +829,22 @@ verify-e2e-gke-lifecycle: provision-gke ## Full GKE E2E (Provision -> Verify -> .PHONY: clean-metrics clean-metrics: @rm -f verification/metrics-*.json + +# Load simulation replicas (override to increase pressure) +LOAD_SIM_REPLICAS ?= 10 + +.PHONY: load-test +load-test: ## Apply load simulation manifests to stress-test zxporter collectors + @echo "[INFO] Applying load simulation manifests..." + @$(KUBECTL) apply -f verification/load-simulation.yaml + @echo "[INFO] Scaling deployments to $(LOAD_SIM_REPLICAS) replicas..." + @$(KUBECTL) scale deploy -n load-test load-web --replicas=$(LOAD_SIM_REPLICAS) + @$(KUBECTL) scale deploy -n load-test load-api --replicas=$(LOAD_SIM_REPLICAS) + @$(KUBECTL) scale deploy -n load-test load-worker --replicas=$(LOAD_SIM_REPLICAS) + @echo "[INFO] Load simulation applied. Monitor with: kubectl get all -n load-test" + +.PHONY: load-test-cleanup +load-test-cleanup: ## Remove all load simulation resources + @echo "[INFO] Deleting load-test namespace..." + @$(KUBECTL) delete ns load-test --ignore-not-found + @echo "[INFO] Load simulation cleaned up." diff --git a/internal/collector/historical_metrics_collector.go b/internal/collector/historical_metrics_collector.go index cccdfcb1..b613174a 100644 --- a/internal/collector/historical_metrics_collector.go +++ b/internal/collector/historical_metrics_collector.go @@ -12,6 +12,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" gen "github.com/devzero-inc/zxporter/gen/api/v1" + "github.com/devzero-inc/zxporter/internal/health" ) const ( @@ -34,14 +35,16 @@ type HistoricalMetricsCollector struct { logger logr.Logger prometheusAPI v1.API semaphore chan struct{} // limits concurrent Prometheus queries + healthManager *health.HealthManager } // NewHistoricalMetricsCollector creates a new collector. -func NewHistoricalMetricsCollector(logger logr.Logger, prometheusAPI v1.API) *HistoricalMetricsCollector { +func NewHistoricalMetricsCollector(logger logr.Logger, prometheusAPI v1.API, healthManager *health.HealthManager) *HistoricalMetricsCollector { return &HistoricalMetricsCollector{ logger: logger.WithName("historical-metrics"), prometheusAPI: prometheusAPI, semaphore: make(chan struct{}, maxConcurrentQueries), + healthManager: healthManager, } } @@ -61,6 +64,8 @@ func (c *HistoricalMetricsCollector) FetchPercentiles(ctx context.Context, workl } } + c.updateHealthStatus(health.HealthStatusHealthy, "Prometheus queries succeeding", map[string]string{"workload ->": workload.WorkloadName}) + return &gen.HistoricalMetricsSummary{ Workload: &gen.MpaWorkloadIdentifier{ Namespace: workload.Namespace, @@ -115,6 +120,7 @@ func (c *HistoricalMetricsCollector) DiscoverContainers(ctx context.Context, nam ) result, _, err := c.prometheusAPI.Query(ctx, query, time.Now()) if err != nil { + c.updateHealthStatus(health.HealthStatusDegraded, "Prometheus query failed", map[string]string{"error": err.Error()}) return nil, err } @@ -199,6 +205,7 @@ func (c *HistoricalMetricsCollector) fetchContainerPercentiles(ctx context.Conte func (c *HistoricalMetricsCollector) queryScalar(ctx context.Context, query string, ts time.Time) (float64, error) { result, warnings, err := c.prometheusAPI.Query(ctx, query, ts) if err != nil { + c.updateHealthStatus(health.HealthStatusDegraded, "Prometheus query failed", map[string]string{"error": err.Error()}) return 0, fmt.Errorf("prometheus query failed: %w", err) } if len(warnings) > 0 { @@ -217,3 +224,10 @@ func (c *HistoricalMetricsCollector) queryScalar(ctx context.Context, query stri return 0, fmt.Errorf("unexpected result type: %T", result) } } + +// updateHealthStatus reports Prometheus component health if a HealthManager is configured. +func (c *HistoricalMetricsCollector) updateHealthStatus(status health.HealthStatus, message string, metadata map[string]string) { + if c.healthManager != nil { + c.healthManager.UpdateStatus(health.ComponentPrometheus, status, message, metadata) + } +} diff --git a/internal/collector/historical_metrics_collector_test.go b/internal/collector/historical_metrics_collector_test.go index 87ffa5a4..905d8a69 100644 --- a/internal/collector/historical_metrics_collector_test.go +++ b/internal/collector/historical_metrics_collector_test.go @@ -103,7 +103,7 @@ func TestHistoricalCollector_FetchPercentiles(t *testing.T) { queryResults: make(map[string]model.Value), } - hc := NewHistoricalMetricsCollector(logr.Discard(), mock) + hc := NewHistoricalMetricsCollector(logr.Discard(), mock, nil) workload := HistoricalWorkloadQuery{ Namespace: "default", diff --git a/internal/collector/historical_metrics_health_test.go b/internal/collector/historical_metrics_health_test.go new file mode 100644 index 00000000..d27b9d5d --- /dev/null +++ b/internal/collector/historical_metrics_health_test.go @@ -0,0 +1,64 @@ +package collector + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/devzero-inc/zxporter/internal/health" + "github.com/go-logr/logr" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" +) + +type errorPrometheusAPI struct { + mockPrometheusAPI +} + +func (m *errorPrometheusAPI) Query(ctx context.Context, query string, ts time.Time, opts ...v1.Option) (model.Value, v1.Warnings, error) { + return nil, nil, fmt.Errorf("prometheus unavailable") +} + +func TestHistoricalCollector_HealthyOnSuccess(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentPrometheus) + + mock := &mockPrometheusAPI{queryResults: make(map[string]model.Value)} + hc := NewHistoricalMetricsCollector(logr.Discard(), mock, hm) + + workload := HistoricalWorkloadQuery{ + Namespace: "default", + WorkloadName: "web-app", + WorkloadKind: "Deployment", + PodRegex: "web-app-.*", + Containers: []string{"app"}, + } + + _, err := hc.FetchPercentiles(context.Background(), workload) + assert.NoError(t, err) + + status, _ := hm.GetStatus(health.ComponentPrometheus) + assert.Equal(t, health.HealthStatusHealthy, status.Status) +} + +func TestHistoricalCollector_DegradedOnQueryError(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentPrometheus) + + mock := &errorPrometheusAPI{} + hc := NewHistoricalMetricsCollector(logr.Discard(), mock, hm) + + _, err := hc.DiscoverContainers(context.Background(), "default", "web-app-.*") + assert.Error(t, err) + + status, _ := hm.GetStatus(health.ComponentPrometheus) + assert.Equal(t, health.HealthStatusDegraded, status.Status) +} + +func TestHistoricalCollector_NilHealthManager(t *testing.T) { + mock := &mockPrometheusAPI{queryResults: make(map[string]model.Value)} + hc := NewHistoricalMetricsCollector(logr.Discard(), mock, nil) + assert.NotNil(t, hc) +} diff --git a/internal/collector/manager.go b/internal/collector/manager.go index 9604e3ab..d70e35bb 100644 --- a/internal/collector/manager.go +++ b/internal/collector/manager.go @@ -10,6 +10,7 @@ import ( "time" gen "github.com/devzero-inc/zxporter/gen/api/v1" + "github.com/devzero-inc/zxporter/internal/health" telemetry_logger "github.com/devzero-inc/zxporter/internal/logger" "github.com/devzero-inc/zxporter/internal/version" "github.com/go-logr/logr" @@ -55,6 +56,7 @@ type CollectionManager struct { config *CollectionConfig logger logr.Logger telemetryLogger telemetry_logger.Logger + healthManager *health.HealthManager } // NewCollectionManager creates a new collection manager @@ -63,6 +65,7 @@ func NewCollectionManager(config *CollectionConfig, telemetryMetrics *TelemetryMetrics, logger logr.Logger, telemetryLogger telemetry_logger.Logger, + healthManager *health.HealthManager, ) *CollectionManager { if config != nil && config.BufferSize > 0 { bufferSize = config.BufferSize @@ -79,6 +82,7 @@ func NewCollectionManager(config *CollectionConfig, config: config, logger: logger, telemetryLogger: telemetryLogger, + healthManager: healthManager, } } @@ -246,6 +250,9 @@ func (m *CollectionManager) StartAll(ctx context.Context) error { } m.started = true + m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusHealthy, fmt.Sprintf("%d collectors started", len(m.collectors)), map[string]string{"collector_count": fmt.Sprintf("%d", len(m.collectors))}) + m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusHealthy, "Buffer is operational", map[string]string{"capacity": fmt.Sprintf("%d", m.bufferSize)}) + return nil } @@ -288,6 +295,7 @@ func (m *CollectionManager) startCollectorInternal(collectorType string, collect m.mu.Lock() delete(m.collectorCtxs, collectorType) m.mu.Unlock() + m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusDegraded, fmt.Sprintf("collector %s failed to start", collectorType), map[string]string{"failed_collector": collectorType}) return fmt.Errorf("failed to start collector %s: %w", collectorType, err) } @@ -369,6 +377,8 @@ func (m *CollectionManager) StopAll() error { } m.started = false + m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusUnhealthy, "All collectors stopped", nil) + m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusUnhealthy, "Buffer stopped", nil) return nil } @@ -402,6 +412,7 @@ func (m *CollectionManager) processCollectorChannel(collectorType string, collec m.logger.Error(nil, "Combined channel buffer full, dropping resources after timeout", "count", len(resources), "type", resources[0].ResourceType.String()) + m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusDegraded, "Buffer full, dropping resources", map[string]string{"capacity": fmt.Sprintf("%d", m.bufferSize), "resource_type": resources[0].ResourceType.String()}) m.telemetryMetrics.MessagesDropped.WithLabelValues(resources[0].ResourceType.String()).Add(float64(len(resources))) } } @@ -452,3 +463,10 @@ func (m *CollectionManager) GetCollector(collectorType string) ResourceCollector } return collector } + +// updateHealthStatus updates the health status of the collection manager +func (m *CollectionManager) updateHealthStatus(component string, status health.HealthStatus, message string, metadata map[string]string) { + if m.healthManager != nil { + m.healthManager.UpdateStatus(component, status, message, metadata) + } +} diff --git a/internal/collector/manager_health_test.go b/internal/collector/manager_health_test.go new file mode 100644 index 00000000..67d811a0 --- /dev/null +++ b/internal/collector/manager_health_test.go @@ -0,0 +1,124 @@ +package collector + +import ( + "context" + "sync" + "testing" + "time" + + gen "github.com/devzero-inc/zxporter/gen/api/v1" + "github.com/devzero-inc/zxporter/internal/health" + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" +) + +// noopTelemetryLogger is a no-op implementation of telemetry_logger.Logger +type noopTelemetryLogger struct{} + +func (n *noopTelemetryLogger) Report(level gen.LogLevel, source string, msg string, err error, fields map[string]string) { +} +func (n *noopTelemetryLogger) Stop() {} + +var ( + sharedTelemetryMetrics *TelemetryMetrics + sharedTelemetryMetricsOnce sync.Once +) + +func getTestTelemetryMetrics() *TelemetryMetrics { + sharedTelemetryMetricsOnce.Do(func() { + sharedTelemetryMetrics = NewTelemetryMetrics() + }) + return sharedTelemetryMetrics +} + +func TestCollectionManager_HealthyAfterStartAll(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentCollectorManager) + hm.Register(health.ComponentBufferQueue) + + mgr := NewCollectionManager( + &CollectionConfig{BufferSize: 10}, + nil, + getTestTelemetryMetrics(), + logr.Discard(), + &noopTelemetryLogger{}, + hm, + ) + + mock := &mockCollector{ + collectorType: "test_resource", + resourceCh: make(chan []CollectedResource, 10), + } + err := mgr.RegisterCollector(mock) + assert.NoError(t, err) + + err = mgr.StartAll(context.Background()) + assert.NoError(t, err) + + // Give goroutines time to start + time.Sleep(100 * time.Millisecond) + + status, exists := hm.GetStatus(health.ComponentCollectorManager) + assert.True(t, exists) + assert.Equal(t, health.HealthStatusHealthy, status.Status) + + bufStatus, exists := hm.GetStatus(health.ComponentBufferQueue) + assert.True(t, exists) + assert.Equal(t, health.HealthStatusHealthy, bufStatus.Status) + + _ = mgr.StopAll() +} + +func TestCollectionManager_UnhealthyAfterStopAll(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentCollectorManager) + hm.Register(health.ComponentBufferQueue) + + mgr := NewCollectionManager( + &CollectionConfig{BufferSize: 10}, + nil, + getTestTelemetryMetrics(), + logr.Discard(), + &noopTelemetryLogger{}, + hm, + ) + + mock := &mockCollector{ + collectorType: "test_resource", + resourceCh: make(chan []CollectedResource, 10), + } + _ = mgr.RegisterCollector(mock) + _ = mgr.StartAll(context.Background()) + time.Sleep(100 * time.Millisecond) + + _ = mgr.StopAll() + + status, _ := hm.GetStatus(health.ComponentCollectorManager) + assert.Equal(t, health.HealthStatusUnhealthy, status.Status) +} + +func TestCollectionManager_NilHealthManager(t *testing.T) { + mgr := NewCollectionManager( + &CollectionConfig{BufferSize: 10}, + nil, + getTestTelemetryMetrics(), + logr.Discard(), + &noopTelemetryLogger{}, + nil, + ) + assert.NotNil(t, mgr) +} + +// mockCollector implements ResourceCollector for testing +type mockCollector struct { + collectorType string + resourceCh chan []CollectedResource + started bool +} + +func (m *mockCollector) Start(ctx context.Context) error { m.started = true; return nil } +func (m *mockCollector) Stop() error { close(m.resourceCh); return nil } +func (m *mockCollector) GetType() string { return m.collectorType } +func (m *mockCollector) GetResourceChannel() <-chan []CollectedResource { return m.resourceCh } +func (m *mockCollector) IsAvailable(ctx context.Context) bool { return true } +func (m *mockCollector) AddResource(resource interface{}) error { return nil } diff --git a/internal/controller/collectionpolicy_controller.go b/internal/controller/collectionpolicy_controller.go index 1ae3f8e6..2eb5e5dc 100644 --- a/internal/controller/collectionpolicy_controller.go +++ b/internal/controller/collectionpolicy_controller.go @@ -43,6 +43,7 @@ import ( "github.com/devzero-inc/zxporter/internal/collector" "github.com/devzero-inc/zxporter/internal/collector/provider" "github.com/devzero-inc/zxporter/internal/collector/snap" + "github.com/devzero-inc/zxporter/internal/health" telemetry_logger "github.com/devzero-inc/zxporter/internal/logger" "github.com/devzero-inc/zxporter/internal/server" "github.com/devzero-inc/zxporter/internal/transport" @@ -76,6 +77,7 @@ type CollectionPolicyReconciler struct { CurrentConfig *PolicyConfig RestartInProgress bool MpaServerPort int + HealthManager *health.HealthManager // pendingCollectors tracks collector factories that were skipped due to CRD unavailability // and should be retried periodically @@ -1097,6 +1099,7 @@ func (r *CollectionPolicyReconciler) restartCollectors(ctx context.Context, newC r.Sender.(transport.DakrClient), r.TelemetryMetrics, 15*time.Second, // Send metrics every 15 seconds + r.HealthManager, ) if err := r.TelemetrySender.Start(ctx); err != nil { @@ -1861,6 +1864,7 @@ func (r *CollectionPolicyReconciler) setupCollectionManager(ctx context.Context, r.TelemetryMetrics, logger.WithName("collection-manager"), r.TelemetryLogger, + r.HealthManager, ) // Create and start the telemetry sender @@ -1869,6 +1873,7 @@ func (r *CollectionPolicyReconciler) setupCollectionManager(ctx context.Context, r.DakrClient, // Use the dakrClient directly r.TelemetryMetrics, 15*time.Second, // Send metrics every 15 seconds + r.HealthManager, ) if err := r.TelemetrySender.Start(ctx); err != nil { @@ -1886,7 +1891,7 @@ func (r *CollectionPolicyReconciler) setupMpaServer() error { if r.MpaServer != nil { return nil } - r.MpaServer = server.NewMpaServer(r.Log, nil) + r.MpaServer = server.NewMpaServer(r.Log, nil, r.HealthManager) return r.MpaServer.Start(r.MpaServerPort) } @@ -3708,6 +3713,7 @@ func (r *CollectionPolicyReconciler) waitForPrometheusAvailability(ctx context.C if resp.StatusCode == http.StatusOK { logger.Info("Prometheus is available", "statusCode", resp.StatusCode) resp.Body.Close() + r.updateHealthStatus(health.HealthStatusHealthy, "Prometheus available", map[string]string{"url": prometheusURL}) return true } @@ -3743,6 +3749,14 @@ func (r *CollectionPolicyReconciler) waitForPrometheusAvailability(ctx context.C "zxporter_version": version.Get().String(), }, ) + r.updateHealthStatus( + health.HealthStatusUnhealthy, + "Prometheus unavailable after retries", + map[string]string{ + "url": prometheusURL, + "max_retries": fmt.Sprintf("%d", maxRetries), + }, + ) return false } @@ -3773,3 +3787,10 @@ func (r *CollectionPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error { }). Complete(r) } + +// updateHealthStatus reports Prometheus component health if a HealthManager is configured. +func (r *CollectionPolicyReconciler) updateHealthStatus(status health.HealthStatus, message string, metadata map[string]string) { + if r.HealthManager != nil { + r.HealthManager.UpdateStatus(health.ComponentPrometheus, status, message, metadata) + } +} diff --git a/internal/controller/custom.go b/internal/controller/custom.go index e0224868..a41be386 100644 --- a/internal/controller/custom.go +++ b/internal/controller/custom.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/devzero-inc/zxporter/internal/collector" + "github.com/devzero-inc/zxporter/internal/health" telemetry_logger "github.com/devzero-inc/zxporter/internal/logger" "github.com/devzero-inc/zxporter/internal/transport" "github.com/devzero-inc/zxporter/internal/util" @@ -95,6 +96,14 @@ func NewEnvBasedController(mgr ctrl.Manager, reconcileInterval time.Duration, mp return nil, fmt.Errorf("failed to create apiextensions client: %w", err) } + // Initialize HealthManager and register components + healthManager := health.NewHealthManager() + healthManager.Register(health.ComponentCollectorManager) + healthManager.Register(health.ComponentBufferQueue) + healthManager.Register(health.ComponentDakrTransport) + healthManager.Register(health.ComponentMpaServer) + healthManager.Register(health.ComponentPrometheus) + // Create a shared Telemetry metrics instance sharedTelemetryMetrics := collector.NewTelemetryMetrics() @@ -112,6 +121,7 @@ func NewEnvBasedController(mgr ctrl.Manager, reconcileInterval time.Duration, mp RestartInProgress: false, ZapLogger: zapLogger, MpaServerPort: mpaServerPort, + HealthManager: healthManager, } logger.Info("Checking 1st reconcile interval", "reconcile", reconcileInterval) @@ -173,6 +183,9 @@ func (c *EnvBasedController) Start(ctx context.Context) error { // Setup periodic reconciliation go c.runPeriodicReconciliation(ctx) + // Run perioic health check reporting + go c.runHealthReporting(ctx) + // Wait for context cancellation <-ctx.Done() close(c.stopCh) @@ -180,6 +193,27 @@ func (c *EnvBasedController) Start(ctx context.Context) error { return nil } +// runHealthReporting periodically logs the health status of all registered components. +func (c *EnvBasedController) runHealthReporting(ctx context.Context) { + ticker := time.NewTicker(60 * time.Second) // Report health status every 60 seconds + defer ticker.Stop() + + for { + select { + case <-ticker.C: + report := c.Reconciler.HealthManager.BuildReport() + for name, status := range report { + c.Log.Info("Health status report", "component", name, "status", status.Status, "message", status.Message, "metadata", status.Metadata) + } + case <-c.stopCh: + return + case <-ctx.Done(): + return + } + + } +} + // NeedLeaderElection implements the LeaderElectionRunnable interface func (c *EnvBasedController) NeedLeaderElection() bool { // This controller should only run on the leader diff --git a/internal/health/component_names.go b/internal/health/component_names.go new file mode 100644 index 00000000..d7452987 --- /dev/null +++ b/internal/health/component_names.go @@ -0,0 +1,10 @@ +package health + +// Component name constants used for HealthManager registration. +const ( + ComponentCollectorManager = "collector_manager" + ComponentBufferQueue = "buffer_queue" + ComponentDakrTransport = "dakr_transport" + ComponentMpaServer = "mpa_server" + ComponentPrometheus = "prometheus" +) diff --git a/internal/health/component_names_test.go b/internal/health/component_names_test.go new file mode 100644 index 00000000..c708485c --- /dev/null +++ b/internal/health/component_names_test.go @@ -0,0 +1,23 @@ +package health + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestComponentNames_AreDistinct(t *testing.T) { + names := []string{ + ComponentCollectorManager, + ComponentBufferQueue, + ComponentDakrTransport, + ComponentMpaServer, + ComponentPrometheus, + } + seen := make(map[string]bool) + for _, name := range names { + assert.NotEmpty(t, name) + assert.False(t, seen[name], "duplicate component name: %s", name) + seen[name] = true + } +} diff --git a/internal/server/mpa_server.go b/internal/server/mpa_server.go index ada68d50..5a6901fd 100644 --- a/internal/server/mpa_server.go +++ b/internal/server/mpa_server.go @@ -9,6 +9,7 @@ import ( gen "github.com/devzero-inc/zxporter/gen/api/v1" "github.com/devzero-inc/zxporter/internal/collector" + "github.com/devzero-inc/zxporter/internal/health" "github.com/go-logr/logr" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" @@ -21,15 +22,17 @@ type MpaServer struct { subscriptionManager *SubscriptionManager grpcServer *grpc.Server historicalCollector *collector.HistoricalMetricsCollector + healthManager *health.HealthManager } // NewMpaServer creates a new MpaServer. // historicalCollector may be nil if Prometheus is not available. -func NewMpaServer(logger logr.Logger, historicalCollector *collector.HistoricalMetricsCollector) *MpaServer { +func NewMpaServer(logger logr.Logger, historicalCollector *collector.HistoricalMetricsCollector, healthManager *health.HealthManager) *MpaServer { return &MpaServer{ logger: logger.WithName("mpa-server"), subscriptionManager: NewSubscriptionManager(logger), historicalCollector: historicalCollector, + healthManager: healthManager, } } @@ -37,6 +40,7 @@ func NewMpaServer(logger logr.Logger, historicalCollector *collector.HistoricalM func (s *MpaServer) Start(port int) error { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { + s.updateHealthStatus(health.HealthStatusUnhealthy, fmt.Sprintf("Failed to listen on port %d", port), map[string]string{"port": fmt.Sprintf("%d", port), "error": err.Error()}) return fmt.Errorf("failed to listen on port %d: %w", port, err) } @@ -48,8 +52,10 @@ func (s *MpaServer) Start(port int) error { go func() { if err := s.grpcServer.Serve(lis); err != nil { s.logger.Error(err, "Failed to serve gRPC") + s.updateHealthStatus(health.HealthStatusUnhealthy, "gRPC server failed", map[string]string{"error": err.Error()}) } }() + s.updateHealthStatus(health.HealthStatusHealthy, "gRPC server listening", map[string]string{"port": fmt.Sprintf("%d", port)}) return nil } @@ -58,6 +64,7 @@ func (s *MpaServer) Stop() { if s.grpcServer != nil { s.logger.Info("Stopping MPA gRPC server") s.grpcServer.GracefulStop() + s.updateHealthStatus(health.HealthStatusUnhealthy, "gRPC server stopped", nil) } } @@ -130,6 +137,13 @@ func (s *MpaServer) PublishMetrics(metrics *collector.ContainerMetricsSnapshot, s.subscriptionManager.Broadcast(metrics, timestamp) } +// updateHealthStatus reports MPA server component health if a HealthManager is configured. +func (s *MpaServer) updateHealthStatus(status health.HealthStatus, message string, metadata map[string]string) { + if s.healthManager != nil { + s.healthManager.UpdateStatus(health.ComponentMpaServer, status, message, metadata) + } +} + // SubscriptionManager manages active streams and their interests type SubscriptionManager struct { mu sync.RWMutex diff --git a/internal/server/mpa_server_health_test.go b/internal/server/mpa_server_health_test.go new file mode 100644 index 00000000..eaacca85 --- /dev/null +++ b/internal/server/mpa_server_health_test.go @@ -0,0 +1,42 @@ +package server + +import ( + "testing" + + "github.com/devzero-inc/zxporter/internal/health" + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" +) + +func TestMpaServer_HealthyAfterStart(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentMpaServer) + + srv := NewMpaServer(logr.Discard(), nil, hm) + err := srv.Start(0) + assert.NoError(t, err) + defer srv.Stop() + + status, exists := hm.GetStatus(health.ComponentMpaServer) + assert.True(t, exists) + assert.Equal(t, health.HealthStatusHealthy, status.Status) +} + +func TestMpaServer_UnhealthyAfterStop(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentMpaServer) + + srv := NewMpaServer(logr.Discard(), nil, hm) + _ = srv.Start(0) + srv.Stop() + + status, _ := hm.GetStatus(health.ComponentMpaServer) + assert.Equal(t, health.HealthStatusUnhealthy, status.Status) +} + +func TestMpaServer_NilHealthManager(t *testing.T) { + srv := NewMpaServer(logr.Discard(), nil, nil) + err := srv.Start(0) + assert.NoError(t, err) + srv.Stop() +} diff --git a/internal/transport/telemetry_sender.go b/internal/transport/telemetry_sender.go index 02b6d641..122907cb 100644 --- a/internal/transport/telemetry_sender.go +++ b/internal/transport/telemetry_sender.go @@ -10,6 +10,7 @@ import ( "time" "github.com/devzero-inc/zxporter/internal/collector" + "github.com/devzero-inc/zxporter/internal/health" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -38,6 +39,7 @@ type TelemetrySender struct { consecutiveFailures int lastFailureTime time.Time circuitOpenUntil time.Time + healthManager *health.HealthManager } // NewTelemetrySender creates a new TelemetrySender @@ -46,17 +48,19 @@ func NewTelemetrySender( dakrClient DakrClient, metrics *collector.TelemetryMetrics, interval time.Duration, + healthManager *health.HealthManager, ) *TelemetrySender { if interval <= 0 { interval = 15 * time.Second // Default interval } return &TelemetrySender{ - logger: logger.WithName("telemetry-sender"), - dakrClient: dakrClient, - metrics: metrics, - interval: interval, - stopCh: make(chan struct{}), + logger: logger.WithName("telemetry-sender"), + dakrClient: dakrClient, + metrics: metrics, + interval: interval, + stopCh: make(chan struct{}), + healthManager: healthManager, } } @@ -98,6 +102,7 @@ func (s *TelemetrySender) recordSuccess() { s.mu.Lock() defer s.mu.Unlock() s.consecutiveFailures = 0 + s.updateHealthStatus(health.HealthStatusHealthy, "Transport operational", map[string]string{"consecutive_failures": "0", "circuit_breaker": "closed"}) s.circuitOpenUntil = time.Time{} } @@ -112,16 +117,18 @@ func (s *TelemetrySender) recordFailure() { if s.consecutiveFailures >= maxConsecutiveFailures { // Calculate backoff duration with exponential increase backoffMultiplier := s.consecutiveFailures - maxConsecutiveFailures + 1 - backoffDuration := time.Duration(backoffMultiplier) * initialBackoff - if backoffDuration > maxBackoff { - backoffDuration = maxBackoff - } + backoffDuration := min(time.Duration(backoffMultiplier)*initialBackoff, maxBackoff) s.circuitOpenUntil = time.Now().Add(backoffDuration) s.logger.Info("Circuit breaker opened", "failures", s.consecutiveFailures, "reopenAt", s.circuitOpenUntil.Format(time.RFC3339), "backoffDuration", backoffDuration.String()) + s.updateHealthStatus(health.HealthStatusUnhealthy, fmt.Sprintf("Circuit breaker open, %d consecutive failures", s.consecutiveFailures), map[string]string{"circuit_breaker": "open", "reopen_at": s.circuitOpenUntil.Format(time.RFC3339)}) + } else { + s.logger.Info("Recorded telemetry send failure", + "consecutiveFailures", s.consecutiveFailures) + s.updateHealthStatus(health.HealthStatusDegraded, fmt.Sprintf("%d consecutive failures", s.consecutiveFailures), map[string]string{"consecutive_failures": fmt.Sprintf("%d", s.consecutiveFailures), "circuit_breaker": "closed"}) } } @@ -273,3 +280,10 @@ func getMetricName(descStr string) string { } return metricName } + +// updateHealthStatus reports Dakr transport component health if a HealthManager is configured. +func (s *TelemetrySender) updateHealthStatus(status health.HealthStatus, message string, metadata map[string]string) { + if s.healthManager != nil { + s.healthManager.UpdateStatus(health.ComponentDakrTransport, status, message, metadata) + } +} diff --git a/internal/transport/telemetry_sender_health_test.go b/internal/transport/telemetry_sender_health_test.go new file mode 100644 index 00000000..c0e12004 --- /dev/null +++ b/internal/transport/telemetry_sender_health_test.go @@ -0,0 +1,56 @@ +package transport + +import ( + "testing" + + "github.com/devzero-inc/zxporter/internal/health" + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" +) + +func TestTelemetrySender_HealthyOnSuccess(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentDakrTransport) + + sender := NewTelemetrySender(logr.Discard(), nil, nil, 0, hm) + + sender.recordSuccess() + + status, exists := hm.GetStatus(health.ComponentDakrTransport) + assert.True(t, exists) + assert.Equal(t, health.HealthStatusHealthy, status.Status) +} + +func TestTelemetrySender_DegradedOnFailure(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentDakrTransport) + + sender := NewTelemetrySender(logr.Discard(), nil, nil, 0, hm) + + sender.recordFailure() + + status, _ := hm.GetStatus(health.ComponentDakrTransport) + assert.Equal(t, health.HealthStatusDegraded, status.Status) + assert.Equal(t, "1", status.Metadata["consecutive_failures"]) +} + +func TestTelemetrySender_UnhealthyOnCircuitOpen(t *testing.T) { + hm := health.NewHealthManager() + hm.Register(health.ComponentDakrTransport) + + sender := NewTelemetrySender(logr.Discard(), nil, nil, 0, hm) + + for i := 0; i < maxConsecutiveFailures; i++ { + sender.recordFailure() + } + + status, _ := hm.GetStatus(health.ComponentDakrTransport) + assert.Equal(t, health.HealthStatusUnhealthy, status.Status) + assert.Equal(t, "open", status.Metadata["circuit_breaker"]) +} + +func TestTelemetrySender_NilHealthManager(t *testing.T) { + sender := NewTelemetrySender(logr.Discard(), nil, nil, 0, nil) + sender.recordSuccess() + sender.recordFailure() +} diff --git a/verification/load-simulation.yaml b/verification/load-simulation.yaml new file mode 100644 index 00000000..58c11fa5 --- /dev/null +++ b/verification/load-simulation.yaml @@ -0,0 +1,763 @@ +# Load simulation manifests for testing zxporter at scale. +# +# Creates a high density of Kubernetes objects across resource types +# that zxporter collects, plus a churn generator that continuously +# creates and deletes resources to stress watch/reconciliation loops. +# +# Usage: +# kubectl apply -f verification/load-simulation.yaml +# # To tear down: +# kubectl delete ns load-test +--- +apiVersion: v1 +kind: Namespace +metadata: + name: load-test + labels: + purpose: zxporter-load-test + +# ---------- ConfigMaps (bulk) ---------- +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-batch-1 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +data: + key1: "value1" + key2: "value2" + key3: "value3" +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-batch-2 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +data: + key1: "value1" +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-batch-3 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +data: + key1: "value1" +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-batch-4 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +data: + key1: "value1" +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-batch-5 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +data: + key1: "value1" + +# ---------- Secrets (bulk) ---------- +--- +apiVersion: v1 +kind: Secret +metadata: + name: secret-batch-1 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +type: Opaque +stringData: + password: "fake-secret-1" +--- +apiVersion: v1 +kind: Secret +metadata: + name: secret-batch-2 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +type: Opaque +stringData: + password: "fake-secret-2" +--- +apiVersion: v1 +kind: Secret +metadata: + name: secret-batch-3 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +type: Opaque +stringData: + password: "fake-secret-3" + +# ---------- ServiceAccounts ---------- +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: load-sa-1 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: load-sa-2 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale + +# ---------- RBAC ---------- +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: load-role + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +rules: + - apiGroups: [""] + resources: ["pods", "configmaps"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: load-rolebinding + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +subjects: + - kind: ServiceAccount + name: load-sa-1 + namespace: load-test +roleRef: + kind: Role + name: load-role + apiGroup: rbac.authorization.k8s.io + +# ---------- ResourceQuota & LimitRange ---------- +--- +apiVersion: v1 +kind: ResourceQuota +metadata: + name: load-quota + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + hard: + pods: "500" + requests.cpu: "100" + requests.memory: 200Gi + limits.cpu: "200" + limits.memory: 400Gi +--- +apiVersion: v1 +kind: LimitRange +metadata: + name: load-limitrange + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + limits: + - default: + cpu: 200m + memory: 256Mi + defaultRequest: + cpu: 50m + memory: 64Mi + type: Container + +# ---------- Services ---------- +--- +apiVersion: v1 +kind: Service +metadata: + name: load-svc-1 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + selector: + app: load-web + ports: + - port: 80 + targetPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: load-svc-2 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + selector: + app: load-api + ports: + - port: 80 + targetPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: load-svc-3 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + selector: + app: load-worker + ports: + - port: 80 + targetPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: load-svc-headless + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + clusterIP: None + selector: + app: load-stateful + ports: + - port: 80 + targetPort: 8080 + +# ---------- NetworkPolicy ---------- +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: load-netpol + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + podSelector: + matchLabels: + app: load-web + policyTypes: + - Ingress + - Egress + ingress: + - from: + - podSelector: + matchLabels: + app: load-api + ports: + - port: 8080 + egress: + - to: + - podSelector: + matchLabels: + app: load-api + +# ---------- Deployments (multiple, with replicas) ---------- +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: load-web + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + replicas: 10 + selector: + matchLabels: + app: load-web + template: + metadata: + labels: + app: load-web + test: zxporter-scale + spec: + terminationGracePeriodSeconds: 1 + containers: + - name: web + image: registry.k8s.io/pause:3.9 + resources: + requests: + cpu: 1m + memory: 4Mi + limits: + cpu: 10m + memory: 16Mi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: load-api + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + replicas: 10 + selector: + matchLabels: + app: load-api + template: + metadata: + labels: + app: load-api + test: zxporter-scale + spec: + terminationGracePeriodSeconds: 1 + containers: + - name: api + image: registry.k8s.io/pause:3.9 + resources: + requests: + cpu: 1m + memory: 4Mi + limits: + cpu: 10m + memory: 16Mi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: load-worker + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + replicas: 8 + selector: + matchLabels: + app: load-worker + template: + metadata: + labels: + app: load-worker + test: zxporter-scale + spec: + terminationGracePeriodSeconds: 1 + containers: + - name: worker + image: registry.k8s.io/pause:3.9 + resources: + requests: + cpu: 1m + memory: 4Mi + limits: + cpu: 10m + memory: 16Mi + - name: sidecar + image: registry.k8s.io/pause:3.9 + resources: + requests: + cpu: 1m + memory: 4Mi + limits: + cpu: 10m + memory: 16Mi + +# ---------- StatefulSet ---------- +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: load-stateful + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + serviceName: load-svc-headless + replicas: 5 + selector: + matchLabels: + app: load-stateful + template: + metadata: + labels: + app: load-stateful + test: zxporter-scale + spec: + terminationGracePeriodSeconds: 1 + containers: + - name: stateful + image: registry.k8s.io/pause:3.9 + resources: + requests: + cpu: 1m + memory: 4Mi + limits: + cpu: 10m + memory: 16Mi + +# ---------- DaemonSet ---------- +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: load-daemonset + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + selector: + matchLabels: + app: load-daemon + template: + metadata: + labels: + app: load-daemon + test: zxporter-scale + spec: + terminationGracePeriodSeconds: 1 + containers: + - name: daemon + image: registry.k8s.io/pause:3.9 + resources: + requests: + cpu: 1m + memory: 4Mi + limits: + cpu: 10m + memory: 16Mi + +# ---------- Jobs ---------- +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: load-job-1 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + completions: 3 + parallelism: 3 + template: + metadata: + labels: + app: load-job + test: zxporter-scale + spec: + restartPolicy: Never + terminationGracePeriodSeconds: 1 + containers: + - name: job + image: busybox:1.36 + command: ["sh", "-c", "echo done && sleep 30"] + resources: + requests: + cpu: 1m + memory: 4Mi +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: load-job-2 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + completions: 5 + parallelism: 5 + template: + metadata: + labels: + app: load-job + test: zxporter-scale + spec: + restartPolicy: Never + terminationGracePeriodSeconds: 1 + containers: + - name: job + image: busybox:1.36 + command: ["sh", "-c", "echo done && sleep 60"] + resources: + requests: + cpu: 1m + memory: 4Mi + +# ---------- CronJob ---------- +--- +apiVersion: batch/v1 +kind: CronJob +metadata: + name: load-cronjob + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + schedule: "*/1 * * * *" + successfulJobsHistoryLimit: 5 + failedJobsHistoryLimit: 3 + jobTemplate: + spec: + template: + metadata: + labels: + app: load-cronjob + test: zxporter-scale + spec: + restartPolicy: Never + terminationGracePeriodSeconds: 1 + containers: + - name: cron + image: busybox:1.36 + command: ["sh", "-c", "echo tick && sleep 10"] + resources: + requests: + cpu: 1m + memory: 4Mi + +# ---------- HPA ---------- +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: load-hpa-web + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: load-web + minReplicas: 5 + maxReplicas: 30 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 50 +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: load-hpa-api + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: load-api + minReplicas: 5 + maxReplicas: 20 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 + +# ---------- PersistentVolumeClaims ---------- +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: load-pvc-1 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 1Gi +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: load-pvc-2 + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 1Gi + +# ======================================== +# Resource Churn Generator +# +# Creates and deletes ConfigMaps and Pods in a loop to produce +# continuous API object churn, stressing zxporter's watch handlers. +# ======================================== +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: churn-generator + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: churn-generator + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +rules: + - apiGroups: [""] + resources: ["pods", "configmaps"] + verbs: ["create", "delete", "list", "get"] + - apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["create", "delete", "list", "get"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: churn-generator + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +subjects: + - kind: ServiceAccount + name: churn-generator + namespace: load-test +roleRef: + kind: Role + name: churn-generator + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: churn-generator + namespace: load-test + labels: + app: load-sim + test: zxporter-scale +spec: + replicas: 1 + selector: + matchLabels: + app: churn-generator + template: + metadata: + labels: + app: churn-generator + test: zxporter-scale + spec: + serviceAccountName: churn-generator + terminationGracePeriodSeconds: 1 + containers: + - name: churn + image: bitnami/kubectl:latest + command: ["/bin/bash"] + args: + - "-c" + - | + BATCH=0 + while true; do + BATCH=$((BATCH + 1)) + echo "=== Churn batch $BATCH ===" + + # Create a wave of ConfigMaps + for i in $(seq 1 20); do + kubectl create configmap "churn-cm-${BATCH}-${i}" \ + -n load-test \ + --from-literal=batch="${BATCH}" \ + --from-literal=index="${i}" \ + --from-literal=data="$(head -c 512 /dev/urandom | base64)" \ + 2>/dev/null & + done + wait + + # Create a wave of short-lived Pods + for i in $(seq 1 10); do + kubectl run "churn-pod-${BATCH}-${i}" \ + -n load-test \ + --image=busybox:1.36 \ + --restart=Never \ + --labels="app=churn-pod,batch=${BATCH}" \ + --command -- sh -c "sleep $((RANDOM % 20 + 5))" \ + 2>/dev/null & + done + wait + + # Let resources exist and be collected + sleep 15 + + # Delete the ConfigMaps from this batch + for i in $(seq 1 20); do + kubectl delete configmap "churn-cm-${BATCH}-${i}" \ + -n load-test --ignore-not-found 2>/dev/null & + done + wait + + # Clean up completed/old churn pods + kubectl delete pods -n load-test -l app=churn-pod \ + --field-selector=status.phase!=Running --ignore-not-found 2>/dev/null + + sleep 5 + done + resources: + requests: + cpu: 10m + memory: 32Mi + limits: + cpu: 100m + memory: 64Mi