Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 111 additions & 5 deletions cmd/zxporter-netmon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
Expand All @@ -11,6 +12,8 @@ import (
"time"

"github.com/cilium/ebpf/rlimit"
gen "github.com/devzero-inc/zxporter/gen/api/v1"
"github.com/devzero-inc/zxporter/internal/health"
"github.com/devzero-inc/zxporter/internal/networkmonitor"
"github.com/devzero-inc/zxporter/internal/networkmonitor/dns"
"github.com/devzero-inc/zxporter/internal/networkmonitor/ebpf"
Expand Down Expand Up @@ -52,6 +55,20 @@ func main() {
zapLog, _ := zap.NewProduction()
logger := zapr.NewLogger(zapLog)

// Create HealthManager and register components early so probes are
// answered immediately, before component initialisation.
healthManager := health.NewHealthManager()
healthManager.Register(health.ComponentMonitor)
healthManager.Register(health.ComponentDakrTransport)
healthManager.Register(health.ComponentEBPFTracer)
healthManager.Register(health.ComponentPodCache)

// Allow 30 seconds for monitor, eBPF, and informer to initialize
// before readiness checks start failing on Unspecified statuses.
healthManager.SuppressReadiness(30 * time.Second)

startTime := time.Now()

nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logger.Info("NODE_NAME environment variable not set, defaulting to localhost (dev mode)")
Expand Down Expand Up @@ -92,6 +109,7 @@ func main() {
logger.Info("Running in STANDALONE mode. K8s connection disabled.")
// Initialize PodCache with nil informer for standalone mode
podCache = networkmonitor.NewPodCache(nil)
healthManager.UpdateStatus(health.ComponentPodCache, health.HealthStatusHealthy, "standalone mode", nil)
}

// 4. Setup Client (Cilium or Netfilter)
Expand Down Expand Up @@ -134,12 +152,15 @@ func main() {
tracer = ebpf.NewTracer(logger, tracerCfg)
// DNS Collector uses the SAME tracer for DNS events
dnsCollector = dns.NewIP2DNS(tracer, logger)
healthManager.UpdateStatus(health.ComponentEBPFTracer, health.HealthStatusHealthy, "initialized", nil)
} else {
logger.Info("Kernel BTF not available, eBPF features disabled")
if *collectorMode == CollectorModeEBPF {
logger.Error(nil, "EBPF mode requested but BTF not available")
os.Exit(1)
}
// eBPF not available β€” deregister so it doesn't affect health checks
healthManager.Deregister(health.ComponentEBPFTracer)
}

// 6. Setup Dakr Client (Control Plane)
Expand All @@ -149,8 +170,11 @@ func main() {
if dakrURL != "" && clusterToken != "" {
logger.Info("Initializing Dakr Client", "url", dakrURL)
dakrClient = transport.NewDakrClient(dakrURL, clusterToken, logger)
healthManager.UpdateStatus(health.ComponentDakrTransport, health.HealthStatusHealthy, "initialized", nil)
} else {
logger.Info("Control Plane credentials not found. Metrics will solely be exposed locally.")
// No dakr connection β€” deregister so it doesn't block readiness
healthManager.Deregister(health.ComponentDakrTransport)
}

// 7. Setup Monitor
Expand All @@ -172,7 +196,9 @@ func main() {
OperatorVersion: versionInfo.String(),
OperatorCommit: versionInfo.GitCommit,
}
monitor, err := networkmonitor.NewMonitor(monitorCfg, logger, podCache, client, tracer, dnsCollector, dakrClient)
monitor, err := networkmonitor.NewMonitor(
monitorCfg, logger, podCache, client, tracer, dnsCollector, dakrClient, healthManager,
)
if err != nil {
logger.Error(err, "Failed to initialize monitor")
os.Exit(1)
Expand All @@ -185,29 +211,109 @@ func main() {
if !*standalone && informerFactory != nil {
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
healthManager.UpdateStatus(health.ComponentPodCache, health.HealthStatusHealthy, "informer synced", nil)
}

// Start eBPF tracer if available (for DNS events and/or network flows)
if tracer != nil {
go func() {
if err := tracer.Run(ctx); err != nil {
logger.Error(err, "eBPF tracer failed")
healthManager.UpdateStatus(health.ComponentEBPFTracer, health.HealthStatusUnhealthy, err.Error(), nil)
}
}()
}

// Start Monitor in background
go monitor.Start(ctx)

// Start heartbeat sender if dakr is configured
if dakrClient != nil {
go func() {
clusterID := os.Getenv("CLUSTER_ID")
if clusterID == "" {
clusterID = "unknown"
}
// Send initial heartbeat immediately
report := healthManager.BuildReport()
req := health.BuildHeartbeatRequestFromReport(
report, clusterID, gen.OperatorType_OPERATOR_TYPE_NETWORK,
versionInfo.String(), versionInfo.GitCommit, startTime,
)
if err := dakrClient.ReportHealth(ctx, req); err != nil {
logger.Error(err, "Failed to send initial health heartbeat to dakr")
}

ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
report := healthManager.BuildReport()
for name, status := range report {
logger.Info("Health status report", "component", name, "status", status.Status, "message", status.Message)
}
req := health.BuildHeartbeatRequestFromReport(
report, clusterID, gen.OperatorType_OPERATOR_TYPE_NETWORK,
versionInfo.String(), versionInfo.GitCommit, startTime,
)
if err := dakrClient.ReportHealth(ctx, req); err != nil {
logger.Error(err, "Failed to send health heartbeat to dakr")
}
}
}
}()
}

// HTTP Server
http.HandleFunc("/metrics", monitor.GetMetricsHandler)
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
mux := http.NewServeMux()
mux.HandleFunc("/metrics", monitor.GetMetricsHandler)
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
report := healthManager.BuildReport()
resp := map[string]any{"components": report}
if cs, exists := report[health.ComponentMonitor]; exists && cs.Status == health.HealthStatusUnhealthy {
resp["status"] = "unhealthy"
resp["error"] = cs.Message
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
_ = json.NewEncoder(w).Encode(resp)
return
}
resp["status"] = "healthy"
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
})
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
report := healthManager.BuildReport()
resp := map[string]any{"components": report}
// Check required components for readiness
for _, name := range []string{health.ComponentMonitor, health.ComponentDakrTransport} {
cs, exists := report[name]
if !exists {
// Component was deregistered (e.g., no dakr configured) β€” skip
continue
}
if cs.Status != health.HealthStatusHealthy && cs.Status != health.HealthStatusDegraded {
resp["status"] = "not ready"
resp["error"] = fmt.Sprintf("%s is not ready (status: %s)", name, cs.Status)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
_ = json.NewEncoder(w).Encode(resp)
return
}
}
resp["status"] = "ready"
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
_ = json.NewEncoder(w).Encode(resp)
})

server := &http.Server{
Addr: *metricsAddr,
Addr: *metricsAddr,
Handler: mux,
}

go func() {
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

gen "github.com/devzero-inc/zxporter/gen/api/v1"
"github.com/devzero-inc/zxporter/internal/collector"
"github.com/devzero-inc/zxporter/internal/health"
telemetry_logger "github.com/devzero-inc/zxporter/internal/logger"
Expand Down Expand Up @@ -222,6 +223,7 @@ func (c *EnvBasedController) sendHealthReport(ctx context.Context) {
req := health.BuildHeartbeatRequestFromReport(
report,
c.getClusterID(),
gen.OperatorType_OPERATOR_TYPE_READ,
versionInfo.String(),
versionInfo.GitCommit,
c.startTime,
Expand Down
3 changes: 3 additions & 0 deletions internal/health/component_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ const (
ComponentDakrTransport = "dakr_transport"
ComponentMpaServer = "mpa_server"
ComponentPrometheus = "prometheus"
ComponentMonitor = "monitor"
ComponentEBPFTracer = "ebpf_tracer"
ComponentPodCache = "pod_cache"
)
10 changes: 5 additions & 5 deletions internal/health/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func worstStatus(a, b gen.HealthStatus) gen.HealthStatus {
}

// BuildHeartbeatRequest constructs a ReportHealthRequest from the current
// HealthManager state. The zxporter operator type is OPERATOR_TYPE_READ.
func BuildHeartbeatRequest(hm *HealthManager, clusterID, version, commit string, startTime time.Time) *gen.ReportHealthRequest {
return BuildHeartbeatRequestFromReport(hm.BuildReport(), clusterID, version, commit, startTime)
// HealthManager state.
func BuildHeartbeatRequest(hm *HealthManager, clusterID string, operatorType gen.OperatorType, version, commit string, startTime time.Time) *gen.ReportHealthRequest {
return BuildHeartbeatRequestFromReport(hm.BuildReport(), clusterID, operatorType, version, commit, startTime)
}

// BuildHeartbeatRequestFromReport constructs a ReportHealthRequest from an
// already-built report map. Use this when you need to log and send the same
// snapshot to avoid a double lock acquisition on HealthManager.
func BuildHeartbeatRequestFromReport(report map[string]ComponentStatus, clusterID, version, commit string, startTime time.Time) *gen.ReportHealthRequest {
func BuildHeartbeatRequestFromReport(report map[string]ComponentStatus, clusterID string, operatorType gen.OperatorType, version, commit string, startTime time.Time) *gen.ReportHealthRequest {
overall := gen.HealthStatus_HEALTH_STATUS_UNSPECIFIED
components := make([]*gen.ComponentHealth, 0, len(report))

Expand All @@ -58,7 +58,7 @@ func BuildHeartbeatRequestFromReport(report map[string]ComponentStatus, clusterI

return &gen.ReportHealthRequest{
ClusterId: clusterID,
OperatorType: gen.OperatorType_OPERATOR_TYPE_READ,
OperatorType: operatorType,
Version: version,
Commit: commit,
OverallStatus: overall,
Expand Down
23 changes: 21 additions & 2 deletions internal/health/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestBuildHeartbeatRequest(t *testing.T) {

startTime := time.Now().Add(-10 * time.Minute)

req := BuildHeartbeatRequest(hm, "cluster-123", "1.2.3", "abc123", startTime)
req := BuildHeartbeatRequest(hm, "cluster-123", gen.OperatorType_OPERATOR_TYPE_READ, "1.2.3", "abc123", startTime)

assert.Equal(t, "cluster-123", req.ClusterId)
assert.Equal(t, "1.2.3", req.Version)
Expand Down Expand Up @@ -74,8 +74,27 @@ func TestBuildHeartbeatRequest_OverallStatus(t *testing.T) {
hm.UpdateStatus(name, s, "", nil)
}

req := BuildHeartbeatRequest(hm, "c1", "1.0.0", "def456", time.Now())
req := BuildHeartbeatRequest(hm, "c1", gen.OperatorType_OPERATOR_TYPE_READ, "1.0.0", "def456", time.Now())
assert.Equal(t, tt.expected, req.OverallStatus)
})
}
}

func TestBuildHeartbeatRequest_OperatorType(t *testing.T) {
hm := NewHealthManager()
hm.Register(ComponentMonitor)
hm.UpdateStatus(ComponentMonitor, HealthStatusHealthy, "running", nil)

startTime := time.Now()

t.Run("network operator type", func(t *testing.T) {
req := BuildHeartbeatRequest(hm, "cluster-456", gen.OperatorType_OPERATOR_TYPE_NETWORK, "0.1.0", "fff000", startTime)
assert.Equal(t, gen.OperatorType_OPERATOR_TYPE_NETWORK, req.OperatorType)
assert.Equal(t, "cluster-456", req.ClusterId)
})

t.Run("read operator type", func(t *testing.T) {
req := BuildHeartbeatRequest(hm, "cluster-789", gen.OperatorType_OPERATOR_TYPE_READ, "1.0.0", "aaa111", startTime)
assert.Equal(t, gen.OperatorType_OPERATOR_TYPE_READ, req.OperatorType)
})
}
52 changes: 32 additions & 20 deletions internal/networkmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"inet.af/netaddr"

"github.com/devzero-inc/zxporter/internal/health"
"github.com/devzero-inc/zxporter/internal/networkmonitor/dns"
"github.com/devzero-inc/zxporter/internal/networkmonitor/ebpf"
"github.com/devzero-inc/zxporter/internal/transport"
Expand Down Expand Up @@ -71,13 +72,14 @@ type MetricsResponse struct {

// Monitor collects network flows using conntrack and aggregates them
type Monitor struct {
cfg Config
log logr.Logger
ct Client
dns dns.DNSCollector
tracer *ebpf.Tracer
podCache *PodCache
dakrClient transport.DakrClient
cfg Config
log logr.Logger
ct Client
dns dns.DNSCollector
tracer *ebpf.Tracer
podCache *PodCache
dakrClient transport.DakrClient
healthManager *health.HealthManager

mu sync.RWMutex

Expand All @@ -91,20 +93,27 @@ type Monitor struct {
}

// NewMonitor creates a new Monitor instance
func NewMonitor(cfg Config, log logr.Logger, podCache *PodCache, ct Client, tracer *ebpf.Tracer, dnsCollector dns.DNSCollector, dakrClient transport.DakrClient) (*Monitor, error) {
func NewMonitor(cfg Config, log logr.Logger, podCache *PodCache, ct Client, tracer *ebpf.Tracer, dnsCollector dns.DNSCollector, dakrClient transport.DakrClient, healthManager *health.HealthManager) (*Monitor, error) {
return &Monitor{
cfg: cfg,
log: log,
ct: ct,
tracer: tracer,
dns: dnsCollector,
podCache: podCache,
dakrClient: dakrClient,
entriesCache: make(map[uint64]*Entry),
podMetrics: make(map[uint64]*NetworkFlow),
cfg: cfg,
log: log,
ct: ct,
tracer: tracer,
dns: dnsCollector,
podCache: podCache,
dakrClient: dakrClient,
healthManager: healthManager,
entriesCache: make(map[uint64]*Entry),
podMetrics: make(map[uint64]*NetworkFlow),
}, nil
}

func (m *Monitor) updateHealth(component string, status health.HealthStatus, message string) {
if m.healthManager != nil {
m.healthManager.UpdateStatus(component, status, message, nil)
}
}

// Start begins the collection loop
func (m *Monitor) Start(ctx context.Context) {
readTicker := time.NewTicker(m.cfg.ReadInterval)
Expand All @@ -131,15 +140,18 @@ func (m *Monitor) Start(ctx context.Context) {
case <-readTicker.C:
if err := m.collect(); err != nil {
m.log.Error(err, "Failed to collect conntrack entries")
m.updateHealth(health.ComponentMonitor, health.HealthStatusDegraded, err.Error())
} else {
m.updateHealth(health.ComponentMonitor, health.HealthStatusHealthy, "collecting")
}
case <-cleanupTicker.C:
m.cleanup()
case <-flushTicker.C:
// Run flush in goroutine to avoid blocking collection (unless we need strict ordering, but collection is fast)
// Actually, flushing needs read lock, collect needs write lock.
// We should probably just call it synchronously to ensure we don't pile up goroutines.
if err := m.flush(ctx); err != nil {
m.log.Error(err, "Failed to flush metrics to control plane")
m.updateHealth(health.ComponentDakrTransport, health.HealthStatusDegraded, err.Error())
} else {
m.updateHealth(health.ComponentDakrTransport, health.HealthStatusHealthy, "connected")
}
}
}
Expand Down