diff --git a/cmd/zxporter-netmon/main.go b/cmd/zxporter-netmon/main.go index 4d43019d..058859f5 100644 --- a/cmd/zxporter-netmon/main.go +++ b/cmd/zxporter-netmon/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "net/http" @@ -11,6 +12,8 @@ import ( "time" "github.com/cilium/ebpf/rlimit" + gen "github.com/devzero-inc/zxporter/gen/api/v1" + "github.com/devzero-inc/zxporter/internal/health" "github.com/devzero-inc/zxporter/internal/networkmonitor" "github.com/devzero-inc/zxporter/internal/networkmonitor/dns" "github.com/devzero-inc/zxporter/internal/networkmonitor/ebpf" @@ -52,6 +55,20 @@ func main() { zapLog, _ := zap.NewProduction() logger := zapr.NewLogger(zapLog) + // Create HealthManager and register components early so probes are + // answered immediately, before component initialisation. + healthManager := health.NewHealthManager() + healthManager.Register(health.ComponentMonitor) + healthManager.Register(health.ComponentDakrTransport) + healthManager.Register(health.ComponentEBPFTracer) + healthManager.Register(health.ComponentPodCache) + + // Allow 30 seconds for monitor, eBPF, and informer to initialize + // before readiness checks start failing on Unspecified statuses. + healthManager.SuppressReadiness(30 * time.Second) + + startTime := time.Now() + nodeName := os.Getenv("NODE_NAME") if nodeName == "" { logger.Info("NODE_NAME environment variable not set, defaulting to localhost (dev mode)") @@ -92,6 +109,7 @@ func main() { logger.Info("Running in STANDALONE mode. K8s connection disabled.") // Initialize PodCache with nil informer for standalone mode podCache = networkmonitor.NewPodCache(nil) + healthManager.UpdateStatus(health.ComponentPodCache, health.HealthStatusHealthy, "standalone mode", nil) } // 4. Setup Client (Cilium or Netfilter) @@ -134,12 +152,15 @@ func main() { tracer = ebpf.NewTracer(logger, tracerCfg) // DNS Collector uses the SAME tracer for DNS events dnsCollector = dns.NewIP2DNS(tracer, logger) + healthManager.UpdateStatus(health.ComponentEBPFTracer, health.HealthStatusHealthy, "initialized", nil) } else { logger.Info("Kernel BTF not available, eBPF features disabled") if *collectorMode == CollectorModeEBPF { logger.Error(nil, "EBPF mode requested but BTF not available") os.Exit(1) } + // eBPF not available — deregister so it doesn't affect health checks + healthManager.Deregister(health.ComponentEBPFTracer) } // 6. Setup Dakr Client (Control Plane) @@ -149,8 +170,11 @@ func main() { if dakrURL != "" && clusterToken != "" { logger.Info("Initializing Dakr Client", "url", dakrURL) dakrClient = transport.NewDakrClient(dakrURL, clusterToken, logger) + healthManager.UpdateStatus(health.ComponentDakrTransport, health.HealthStatusHealthy, "initialized", nil) } else { logger.Info("Control Plane credentials not found. Metrics will solely be exposed locally.") + // No dakr connection — deregister so it doesn't block readiness + healthManager.Deregister(health.ComponentDakrTransport) } // 7. Setup Monitor @@ -172,7 +196,9 @@ func main() { OperatorVersion: versionInfo.String(), OperatorCommit: versionInfo.GitCommit, } - monitor, err := networkmonitor.NewMonitor(monitorCfg, logger, podCache, client, tracer, dnsCollector, dakrClient) + monitor, err := networkmonitor.NewMonitor( + monitorCfg, logger, podCache, client, tracer, dnsCollector, dakrClient, healthManager, + ) if err != nil { logger.Error(err, "Failed to initialize monitor") os.Exit(1) @@ -185,6 +211,7 @@ func main() { if !*standalone && informerFactory != nil { informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) + healthManager.UpdateStatus(health.ComponentPodCache, health.HealthStatusHealthy, "informer synced", nil) } // Start eBPF tracer if available (for DNS events and/or network flows) @@ -192,6 +219,7 @@ func main() { go func() { if err := tracer.Run(ctx); err != nil { logger.Error(err, "eBPF tracer failed") + healthManager.UpdateStatus(health.ComponentEBPFTracer, health.HealthStatusUnhealthy, err.Error(), nil) } }() } @@ -199,15 +227,93 @@ func main() { // Start Monitor in background go monitor.Start(ctx) + // Start heartbeat sender if dakr is configured + if dakrClient != nil { + go func() { + clusterID := os.Getenv("CLUSTER_ID") + if clusterID == "" { + clusterID = "unknown" + } + // Send initial heartbeat immediately + report := healthManager.BuildReport() + req := health.BuildHeartbeatRequestFromReport( + report, clusterID, gen.OperatorType_OPERATOR_TYPE_NETWORK, + versionInfo.String(), versionInfo.GitCommit, startTime, + ) + if err := dakrClient.ReportHealth(ctx, req); err != nil { + logger.Error(err, "Failed to send initial health heartbeat to dakr") + } + + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + report := healthManager.BuildReport() + for name, status := range report { + logger.Info("Health status report", "component", name, "status", status.Status, "message", status.Message) + } + req := health.BuildHeartbeatRequestFromReport( + report, clusterID, gen.OperatorType_OPERATOR_TYPE_NETWORK, + versionInfo.String(), versionInfo.GitCommit, startTime, + ) + if err := dakrClient.ReportHealth(ctx, req); err != nil { + logger.Error(err, "Failed to send health heartbeat to dakr") + } + } + } + }() + } + // HTTP Server - http.HandleFunc("/metrics", monitor.GetMetricsHandler) - http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + mux := http.NewServeMux() + mux.HandleFunc("/metrics", monitor.GetMetricsHandler) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + report := healthManager.BuildReport() + resp := map[string]any{"components": report} + if cs, exists := report[health.ComponentMonitor]; exists && cs.Status == health.HealthStatusUnhealthy { + resp["status"] = "unhealthy" + resp["error"] = cs.Message + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + _ = json.NewEncoder(w).Encode(resp) + return + } + resp["status"] = "healthy" + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(resp) + }) + mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { + report := healthManager.BuildReport() + resp := map[string]any{"components": report} + // Check required components for readiness + for _, name := range []string{health.ComponentMonitor, health.ComponentDakrTransport} { + cs, exists := report[name] + if !exists { + // Component was deregistered (e.g., no dakr configured) — skip + continue + } + if cs.Status != health.HealthStatusHealthy && cs.Status != health.HealthStatusDegraded { + resp["status"] = "not ready" + resp["error"] = fmt.Sprintf("%s is not ready (status: %s)", name, cs.Status) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + _ = json.NewEncoder(w).Encode(resp) + return + } + } + resp["status"] = "ready" + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte("OK")) + _ = json.NewEncoder(w).Encode(resp) }) server := &http.Server{ - Addr: *metricsAddr, + Addr: *metricsAddr, + Handler: mux, } go func() { diff --git a/internal/controller/custom.go b/internal/controller/custom.go index 89a185b6..a995925a 100644 --- a/internal/controller/custom.go +++ b/internal/controller/custom.go @@ -37,6 +37,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + gen "github.com/devzero-inc/zxporter/gen/api/v1" "github.com/devzero-inc/zxporter/internal/collector" "github.com/devzero-inc/zxporter/internal/health" telemetry_logger "github.com/devzero-inc/zxporter/internal/logger" @@ -222,6 +223,7 @@ func (c *EnvBasedController) sendHealthReport(ctx context.Context) { req := health.BuildHeartbeatRequestFromReport( report, c.getClusterID(), + gen.OperatorType_OPERATOR_TYPE_READ, versionInfo.String(), versionInfo.GitCommit, c.startTime, diff --git a/internal/health/component_names.go b/internal/health/component_names.go index d7452987..5a87033d 100644 --- a/internal/health/component_names.go +++ b/internal/health/component_names.go @@ -7,4 +7,7 @@ const ( ComponentDakrTransport = "dakr_transport" ComponentMpaServer = "mpa_server" ComponentPrometheus = "prometheus" + ComponentMonitor = "monitor" + ComponentEBPFTracer = "ebpf_tracer" + ComponentPodCache = "pod_cache" ) diff --git a/internal/health/heartbeat.go b/internal/health/heartbeat.go index ebf07cab..af5a78b5 100644 --- a/internal/health/heartbeat.go +++ b/internal/health/heartbeat.go @@ -32,15 +32,15 @@ func worstStatus(a, b gen.HealthStatus) gen.HealthStatus { } // BuildHeartbeatRequest constructs a ReportHealthRequest from the current -// HealthManager state. The zxporter operator type is OPERATOR_TYPE_READ. -func BuildHeartbeatRequest(hm *HealthManager, clusterID, version, commit string, startTime time.Time) *gen.ReportHealthRequest { - return BuildHeartbeatRequestFromReport(hm.BuildReport(), clusterID, version, commit, startTime) +// HealthManager state. +func BuildHeartbeatRequest(hm *HealthManager, clusterID string, operatorType gen.OperatorType, version, commit string, startTime time.Time) *gen.ReportHealthRequest { + return BuildHeartbeatRequestFromReport(hm.BuildReport(), clusterID, operatorType, version, commit, startTime) } // BuildHeartbeatRequestFromReport constructs a ReportHealthRequest from an // already-built report map. Use this when you need to log and send the same // snapshot to avoid a double lock acquisition on HealthManager. -func BuildHeartbeatRequestFromReport(report map[string]ComponentStatus, clusterID, version, commit string, startTime time.Time) *gen.ReportHealthRequest { +func BuildHeartbeatRequestFromReport(report map[string]ComponentStatus, clusterID string, operatorType gen.OperatorType, version, commit string, startTime time.Time) *gen.ReportHealthRequest { overall := gen.HealthStatus_HEALTH_STATUS_UNSPECIFIED components := make([]*gen.ComponentHealth, 0, len(report)) @@ -58,7 +58,7 @@ func BuildHeartbeatRequestFromReport(report map[string]ComponentStatus, clusterI return &gen.ReportHealthRequest{ ClusterId: clusterID, - OperatorType: gen.OperatorType_OPERATOR_TYPE_READ, + OperatorType: operatorType, Version: version, Commit: commit, OverallStatus: overall, diff --git a/internal/health/heartbeat_test.go b/internal/health/heartbeat_test.go index 6943f70b..87ecd395 100644 --- a/internal/health/heartbeat_test.go +++ b/internal/health/heartbeat_test.go @@ -18,7 +18,7 @@ func TestBuildHeartbeatRequest(t *testing.T) { startTime := time.Now().Add(-10 * time.Minute) - req := BuildHeartbeatRequest(hm, "cluster-123", "1.2.3", "abc123", startTime) + req := BuildHeartbeatRequest(hm, "cluster-123", gen.OperatorType_OPERATOR_TYPE_READ, "1.2.3", "abc123", startTime) assert.Equal(t, "cluster-123", req.ClusterId) assert.Equal(t, "1.2.3", req.Version) @@ -74,8 +74,27 @@ func TestBuildHeartbeatRequest_OverallStatus(t *testing.T) { hm.UpdateStatus(name, s, "", nil) } - req := BuildHeartbeatRequest(hm, "c1", "1.0.0", "def456", time.Now()) + req := BuildHeartbeatRequest(hm, "c1", gen.OperatorType_OPERATOR_TYPE_READ, "1.0.0", "def456", time.Now()) assert.Equal(t, tt.expected, req.OverallStatus) }) } } + +func TestBuildHeartbeatRequest_OperatorType(t *testing.T) { + hm := NewHealthManager() + hm.Register(ComponentMonitor) + hm.UpdateStatus(ComponentMonitor, HealthStatusHealthy, "running", nil) + + startTime := time.Now() + + t.Run("network operator type", func(t *testing.T) { + req := BuildHeartbeatRequest(hm, "cluster-456", gen.OperatorType_OPERATOR_TYPE_NETWORK, "0.1.0", "fff000", startTime) + assert.Equal(t, gen.OperatorType_OPERATOR_TYPE_NETWORK, req.OperatorType) + assert.Equal(t, "cluster-456", req.ClusterId) + }) + + t.Run("read operator type", func(t *testing.T) { + req := BuildHeartbeatRequest(hm, "cluster-789", gen.OperatorType_OPERATOR_TYPE_READ, "1.0.0", "aaa111", startTime) + assert.Equal(t, gen.OperatorType_OPERATOR_TYPE_READ, req.OperatorType) + }) +} diff --git a/internal/networkmonitor/monitor.go b/internal/networkmonitor/monitor.go index 096118e8..d3c42d58 100644 --- a/internal/networkmonitor/monitor.go +++ b/internal/networkmonitor/monitor.go @@ -13,6 +13,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "inet.af/netaddr" + "github.com/devzero-inc/zxporter/internal/health" "github.com/devzero-inc/zxporter/internal/networkmonitor/dns" "github.com/devzero-inc/zxporter/internal/networkmonitor/ebpf" "github.com/devzero-inc/zxporter/internal/transport" @@ -71,13 +72,14 @@ type MetricsResponse struct { // Monitor collects network flows using conntrack and aggregates them type Monitor struct { - cfg Config - log logr.Logger - ct Client - dns dns.DNSCollector - tracer *ebpf.Tracer - podCache *PodCache - dakrClient transport.DakrClient + cfg Config + log logr.Logger + ct Client + dns dns.DNSCollector + tracer *ebpf.Tracer + podCache *PodCache + dakrClient transport.DakrClient + healthManager *health.HealthManager mu sync.RWMutex @@ -91,20 +93,27 @@ type Monitor struct { } // NewMonitor creates a new Monitor instance -func NewMonitor(cfg Config, log logr.Logger, podCache *PodCache, ct Client, tracer *ebpf.Tracer, dnsCollector dns.DNSCollector, dakrClient transport.DakrClient) (*Monitor, error) { +func NewMonitor(cfg Config, log logr.Logger, podCache *PodCache, ct Client, tracer *ebpf.Tracer, dnsCollector dns.DNSCollector, dakrClient transport.DakrClient, healthManager *health.HealthManager) (*Monitor, error) { return &Monitor{ - cfg: cfg, - log: log, - ct: ct, - tracer: tracer, - dns: dnsCollector, - podCache: podCache, - dakrClient: dakrClient, - entriesCache: make(map[uint64]*Entry), - podMetrics: make(map[uint64]*NetworkFlow), + cfg: cfg, + log: log, + ct: ct, + tracer: tracer, + dns: dnsCollector, + podCache: podCache, + dakrClient: dakrClient, + healthManager: healthManager, + entriesCache: make(map[uint64]*Entry), + podMetrics: make(map[uint64]*NetworkFlow), }, nil } +func (m *Monitor) updateHealth(component string, status health.HealthStatus, message string) { + if m.healthManager != nil { + m.healthManager.UpdateStatus(component, status, message, nil) + } +} + // Start begins the collection loop func (m *Monitor) Start(ctx context.Context) { readTicker := time.NewTicker(m.cfg.ReadInterval) @@ -131,15 +140,18 @@ func (m *Monitor) Start(ctx context.Context) { case <-readTicker.C: if err := m.collect(); err != nil { m.log.Error(err, "Failed to collect conntrack entries") + m.updateHealth(health.ComponentMonitor, health.HealthStatusDegraded, err.Error()) + } else { + m.updateHealth(health.ComponentMonitor, health.HealthStatusHealthy, "collecting") } case <-cleanupTicker.C: m.cleanup() case <-flushTicker.C: - // Run flush in goroutine to avoid blocking collection (unless we need strict ordering, but collection is fast) - // Actually, flushing needs read lock, collect needs write lock. - // We should probably just call it synchronously to ensure we don't pile up goroutines. if err := m.flush(ctx); err != nil { m.log.Error(err, "Failed to flush metrics to control plane") + m.updateHealth(health.ComponentDakrTransport, health.HealthStatusDegraded, err.Error()) + } else { + m.updateHealth(health.ComponentDakrTransport, health.HealthStatusHealthy, "connected") } } }