Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ debug_metrics.json
debug_rendered.yaml

#ignore dev helm deployment
helm-chart/zxporter/local_values.yaml
helm-chart/zxporter/local_values.yaml

docs/plans
19 changes: 19 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -829,3 +829,22 @@ verify-e2e-gke-lifecycle: provision-gke ## Full GKE E2E (Provision -> Verify ->
.PHONY: clean-metrics
clean-metrics:
@rm -f verification/metrics-*.json

# Load simulation replicas (override to increase pressure)
LOAD_SIM_REPLICAS ?= 10

.PHONY: load-test
load-test: ## Apply load simulation manifests to stress-test zxporter collectors
@echo "[INFO] Applying load simulation manifests..."
@$(KUBECTL) apply -f verification/load-simulation.yaml
@echo "[INFO] Scaling deployments to $(LOAD_SIM_REPLICAS) replicas..."
@$(KUBECTL) scale deploy -n load-test load-web --replicas=$(LOAD_SIM_REPLICAS)
@$(KUBECTL) scale deploy -n load-test load-api --replicas=$(LOAD_SIM_REPLICAS)
@$(KUBECTL) scale deploy -n load-test load-worker --replicas=$(LOAD_SIM_REPLICAS)
@echo "[INFO] Load simulation applied. Monitor with: kubectl get all -n load-test"

.PHONY: load-test-cleanup
load-test-cleanup: ## Remove all load simulation resources
@echo "[INFO] Deleting load-test namespace..."
@$(KUBECTL) delete ns load-test --ignore-not-found
@echo "[INFO] Load simulation cleaned up."
16 changes: 15 additions & 1 deletion internal/collector/historical_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

gen "github.com/devzero-inc/zxporter/gen/api/v1"
"github.com/devzero-inc/zxporter/internal/health"
)

const (
Expand All @@ -34,14 +35,16 @@ type HistoricalMetricsCollector struct {
logger logr.Logger
prometheusAPI v1.API
semaphore chan struct{} // limits concurrent Prometheus queries
healthManager *health.HealthManager
}

// NewHistoricalMetricsCollector creates a new collector.
func NewHistoricalMetricsCollector(logger logr.Logger, prometheusAPI v1.API) *HistoricalMetricsCollector {
func NewHistoricalMetricsCollector(logger logr.Logger, prometheusAPI v1.API, healthManager *health.HealthManager) *HistoricalMetricsCollector {
return &HistoricalMetricsCollector{
logger: logger.WithName("historical-metrics"),
prometheusAPI: prometheusAPI,
semaphore: make(chan struct{}, maxConcurrentQueries),
healthManager: healthManager,
}
}

Expand All @@ -61,6 +64,8 @@ func (c *HistoricalMetricsCollector) FetchPercentiles(ctx context.Context, workl
}
}

c.updateHealthStatus(health.HealthStatusHealthy, "Prometheus queries succeeding", map[string]string{"workload ->": workload.WorkloadName})

return &gen.HistoricalMetricsSummary{
Workload: &gen.MpaWorkloadIdentifier{
Namespace: workload.Namespace,
Expand Down Expand Up @@ -115,6 +120,7 @@ func (c *HistoricalMetricsCollector) DiscoverContainers(ctx context.Context, nam
)
result, _, err := c.prometheusAPI.Query(ctx, query, time.Now())
if err != nil {
c.updateHealthStatus(health.HealthStatusDegraded, "Prometheus query failed", map[string]string{"error": err.Error()})
return nil, err
}

Expand Down Expand Up @@ -199,6 +205,7 @@ func (c *HistoricalMetricsCollector) fetchContainerPercentiles(ctx context.Conte
func (c *HistoricalMetricsCollector) queryScalar(ctx context.Context, query string, ts time.Time) (float64, error) {
result, warnings, err := c.prometheusAPI.Query(ctx, query, ts)
if err != nil {
c.updateHealthStatus(health.HealthStatusDegraded, "Prometheus query failed", map[string]string{"error": err.Error()})
return 0, fmt.Errorf("prometheus query failed: %w", err)
}
if len(warnings) > 0 {
Expand All @@ -217,3 +224,10 @@ func (c *HistoricalMetricsCollector) queryScalar(ctx context.Context, query stri
return 0, fmt.Errorf("unexpected result type: %T", result)
}
}

// updateHealthStatus reports Prometheus component health if a HealthManager is configured.
func (c *HistoricalMetricsCollector) updateHealthStatus(status health.HealthStatus, message string, metadata map[string]string) {
if c.healthManager != nil {
c.healthManager.UpdateStatus(health.ComponentPrometheus, status, message, metadata)
}
}
2 changes: 1 addition & 1 deletion internal/collector/historical_metrics_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestHistoricalCollector_FetchPercentiles(t *testing.T) {
queryResults: make(map[string]model.Value),
}

hc := NewHistoricalMetricsCollector(logr.Discard(), mock)
hc := NewHistoricalMetricsCollector(logr.Discard(), mock, nil)

workload := HistoricalWorkloadQuery{
Namespace: "default",
Expand Down
64 changes: 64 additions & 0 deletions internal/collector/historical_metrics_health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package collector

import (
"context"
"fmt"
"testing"
"time"

"github.com/devzero-inc/zxporter/internal/health"
"github.com/go-logr/logr"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

type errorPrometheusAPI struct {
mockPrometheusAPI
}

func (m *errorPrometheusAPI) Query(ctx context.Context, query string, ts time.Time, opts ...v1.Option) (model.Value, v1.Warnings, error) {
return nil, nil, fmt.Errorf("prometheus unavailable")
}

func TestHistoricalCollector_HealthyOnSuccess(t *testing.T) {
hm := health.NewHealthManager()
hm.Register(health.ComponentPrometheus)

mock := &mockPrometheusAPI{queryResults: make(map[string]model.Value)}
hc := NewHistoricalMetricsCollector(logr.Discard(), mock, hm)

workload := HistoricalWorkloadQuery{
Namespace: "default",
WorkloadName: "web-app",
WorkloadKind: "Deployment",
PodRegex: "web-app-.*",
Containers: []string{"app"},
}

_, err := hc.FetchPercentiles(context.Background(), workload)
assert.NoError(t, err)

status, _ := hm.GetStatus(health.ComponentPrometheus)
assert.Equal(t, health.HealthStatusHealthy, status.Status)
}

func TestHistoricalCollector_DegradedOnQueryError(t *testing.T) {
hm := health.NewHealthManager()
hm.Register(health.ComponentPrometheus)

mock := &errorPrometheusAPI{}
hc := NewHistoricalMetricsCollector(logr.Discard(), mock, hm)

_, err := hc.DiscoverContainers(context.Background(), "default", "web-app-.*")
assert.Error(t, err)

status, _ := hm.GetStatus(health.ComponentPrometheus)
assert.Equal(t, health.HealthStatusDegraded, status.Status)
}

func TestHistoricalCollector_NilHealthManager(t *testing.T) {
mock := &mockPrometheusAPI{queryResults: make(map[string]model.Value)}
hc := NewHistoricalMetricsCollector(logr.Discard(), mock, nil)
assert.NotNil(t, hc)
}
18 changes: 18 additions & 0 deletions internal/collector/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

gen "github.com/devzero-inc/zxporter/gen/api/v1"
"github.com/devzero-inc/zxporter/internal/health"
telemetry_logger "github.com/devzero-inc/zxporter/internal/logger"
"github.com/devzero-inc/zxporter/internal/version"
"github.com/go-logr/logr"
Expand Down Expand Up @@ -55,6 +56,7 @@ type CollectionManager struct {
config *CollectionConfig
logger logr.Logger
telemetryLogger telemetry_logger.Logger
healthManager *health.HealthManager
}

// NewCollectionManager creates a new collection manager
Expand All @@ -63,6 +65,7 @@ func NewCollectionManager(config *CollectionConfig,
telemetryMetrics *TelemetryMetrics,
logger logr.Logger,
telemetryLogger telemetry_logger.Logger,
healthManager *health.HealthManager,
) *CollectionManager {
if config != nil && config.BufferSize > 0 {
bufferSize = config.BufferSize
Expand All @@ -79,6 +82,7 @@ func NewCollectionManager(config *CollectionConfig,
config: config,
logger: logger,
telemetryLogger: telemetryLogger,
healthManager: healthManager,
}
}

Expand Down Expand Up @@ -246,6 +250,9 @@ func (m *CollectionManager) StartAll(ctx context.Context) error {
}

m.started = true
m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusHealthy, fmt.Sprintf("%d collectors started", len(m.collectors)), map[string]string{"collector_count": fmt.Sprintf("%d", len(m.collectors))})
m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusHealthy, "Buffer is operational", map[string]string{"capacity": fmt.Sprintf("%d", m.bufferSize)})

return nil
}

Expand Down Expand Up @@ -288,6 +295,7 @@ func (m *CollectionManager) startCollectorInternal(collectorType string, collect
m.mu.Lock()
delete(m.collectorCtxs, collectorType)
m.mu.Unlock()
m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusDegraded, fmt.Sprintf("collector %s failed to start", collectorType), map[string]string{"failed_collector": collectorType})
return fmt.Errorf("failed to start collector %s: %w", collectorType, err)
}

Expand Down Expand Up @@ -369,6 +377,8 @@ func (m *CollectionManager) StopAll() error {
}

m.started = false
m.updateHealthStatus(health.ComponentCollectorManager, health.HealthStatusUnhealthy, "All collectors stopped", nil)
m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusUnhealthy, "Buffer stopped", nil)
return nil
}

Expand Down Expand Up @@ -402,6 +412,7 @@ func (m *CollectionManager) processCollectorChannel(collectorType string, collec
m.logger.Error(nil, "Combined channel buffer full, dropping resources after timeout",
"count", len(resources),
"type", resources[0].ResourceType.String())
m.updateHealthStatus(health.ComponentBufferQueue, health.HealthStatusDegraded, "Buffer full, dropping resources", map[string]string{"capacity": fmt.Sprintf("%d", m.bufferSize), "resource_type": resources[0].ResourceType.String()})
m.telemetryMetrics.MessagesDropped.WithLabelValues(resources[0].ResourceType.String()).Add(float64(len(resources)))
}
}
Expand Down Expand Up @@ -452,3 +463,10 @@ func (m *CollectionManager) GetCollector(collectorType string) ResourceCollector
}
return collector
}

// updateHealthStatus updates the health status of the collection manager
func (m *CollectionManager) updateHealthStatus(component string, status health.HealthStatus, message string, metadata map[string]string) {
if m.healthManager != nil {
m.healthManager.UpdateStatus(component, status, message, metadata)
}
}
124 changes: 124 additions & 0 deletions internal/collector/manager_health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package collector

import (
"context"
"sync"
"testing"
"time"

gen "github.com/devzero-inc/zxporter/gen/api/v1"
"github.com/devzero-inc/zxporter/internal/health"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
)

// noopTelemetryLogger is a no-op implementation of telemetry_logger.Logger
type noopTelemetryLogger struct{}

func (n *noopTelemetryLogger) Report(level gen.LogLevel, source string, msg string, err error, fields map[string]string) {
}
func (n *noopTelemetryLogger) Stop() {}

var (
sharedTelemetryMetrics *TelemetryMetrics
sharedTelemetryMetricsOnce sync.Once
)

func getTestTelemetryMetrics() *TelemetryMetrics {
sharedTelemetryMetricsOnce.Do(func() {
sharedTelemetryMetrics = NewTelemetryMetrics()
})
return sharedTelemetryMetrics
}

func TestCollectionManager_HealthyAfterStartAll(t *testing.T) {
hm := health.NewHealthManager()
hm.Register(health.ComponentCollectorManager)
hm.Register(health.ComponentBufferQueue)

mgr := NewCollectionManager(
&CollectionConfig{BufferSize: 10},
nil,
getTestTelemetryMetrics(),
logr.Discard(),
&noopTelemetryLogger{},
hm,
)

mock := &mockCollector{
collectorType: "test_resource",
resourceCh: make(chan []CollectedResource, 10),
}
err := mgr.RegisterCollector(mock)
assert.NoError(t, err)

err = mgr.StartAll(context.Background())
assert.NoError(t, err)

// Give goroutines time to start
time.Sleep(100 * time.Millisecond)

status, exists := hm.GetStatus(health.ComponentCollectorManager)
assert.True(t, exists)
assert.Equal(t, health.HealthStatusHealthy, status.Status)

bufStatus, exists := hm.GetStatus(health.ComponentBufferQueue)
assert.True(t, exists)
assert.Equal(t, health.HealthStatusHealthy, bufStatus.Status)

_ = mgr.StopAll()
}

func TestCollectionManager_UnhealthyAfterStopAll(t *testing.T) {
hm := health.NewHealthManager()
hm.Register(health.ComponentCollectorManager)
hm.Register(health.ComponentBufferQueue)

mgr := NewCollectionManager(
&CollectionConfig{BufferSize: 10},
nil,
getTestTelemetryMetrics(),
logr.Discard(),
&noopTelemetryLogger{},
hm,
)

mock := &mockCollector{
collectorType: "test_resource",
resourceCh: make(chan []CollectedResource, 10),
}
_ = mgr.RegisterCollector(mock)
_ = mgr.StartAll(context.Background())
time.Sleep(100 * time.Millisecond)

_ = mgr.StopAll()

status, _ := hm.GetStatus(health.ComponentCollectorManager)
assert.Equal(t, health.HealthStatusUnhealthy, status.Status)
}

func TestCollectionManager_NilHealthManager(t *testing.T) {
mgr := NewCollectionManager(
&CollectionConfig{BufferSize: 10},
nil,
getTestTelemetryMetrics(),
logr.Discard(),
&noopTelemetryLogger{},
nil,
)
assert.NotNil(t, mgr)
}

// mockCollector implements ResourceCollector for testing
type mockCollector struct {
collectorType string
resourceCh chan []CollectedResource
started bool
}

func (m *mockCollector) Start(ctx context.Context) error { m.started = true; return nil }
func (m *mockCollector) Stop() error { close(m.resourceCh); return nil }
func (m *mockCollector) GetType() string { return m.collectorType }
func (m *mockCollector) GetResourceChannel() <-chan []CollectedResource { return m.resourceCh }
func (m *mockCollector) IsAvailable(ctx context.Context) bool { return true }
func (m *mockCollector) AddResource(resource interface{}) error { return nil }
Loading