From 9b72ad1b60675a41cd8a0d48a8ef62f27ac7d710 Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Thu, 7 May 2026 00:43:28 -0700 Subject: [PATCH] fix: make oom_kill_count cumulative in MPA timeseries --- internal/server/mpa_server.go | 57 ++++++++-- internal/server/mpa_server_test.go | 161 +++++++++++++++++++++++++++++ 2 files changed, 210 insertions(+), 8 deletions(-) create mode 100644 internal/server/mpa_server_test.go diff --git a/internal/server/mpa_server.go b/internal/server/mpa_server.go index 50d1a957..24a2f0cc 100644 --- a/internal/server/mpa_server.go +++ b/internal/server/mpa_server.go @@ -178,6 +178,19 @@ type SubscriptionManager struct { mu sync.RWMutex clients map[string]*ClientSubscription logger logr.Logger + + // OOM state tracking so timeseries can reflect OOM kills cumulatively. + // + // We intentionally track this in the streaming layer so that even if a single + // OOM-bearing sample is dropped due to backpressure, subsequent utilization + // samples can still carry a cumulative count. + oomMu sync.Mutex + oomByKey map[string]oomState +} + +type oomState struct { + count int64 + lastRestart int64 } type ClientSubscription struct { @@ -195,9 +208,40 @@ type WorkloadInterest struct { func NewSubscriptionManager(logger logr.Logger) *SubscriptionManager { return &SubscriptionManager{ - clients: make(map[string]*ClientSubscription), - logger: logger.WithName("sub-manager"), + clients: make(map[string]*ClientSubscription), + logger: logger.WithName("sub-manager"), + oomByKey: make(map[string]oomState), + } +} + +func oomKey(ns, pod, container string) string { + return ns + "/" + pod + "/" + container +} + +// getAndUpdateOomCount updates (if needed) and returns the cumulative OOM kill count. +// +// Increment rule: when we observe OOMKilled AND RestartCount increases beyond the +// last processed restartCount for this key. +func (sm *SubscriptionManager) getAndUpdateOomCount(ns, pod, container string, restartCount int64, lastReason string) int64 { + key := oomKey(ns, pod, container) + + sm.oomMu.Lock() + defer sm.oomMu.Unlock() + + st := sm.oomByKey[key] + + if lastReason == collector.ReasonOOMKilled { + if restartCount > st.lastRestart { + st.count++ + st.lastRestart = restartCount + } + } else if restartCount > st.lastRestart { + // Keep lastRestart moving forward even when not OOMKilled. + st.lastRestart = restartCount } + + sm.oomByKey[key] = st + return st.count } func (sm *SubscriptionManager) Register(ch chan *gen.MpaStreamResponse) string { @@ -287,6 +331,9 @@ func (sm *SubscriptionManager) Broadcast( restartCount := data.RestartCount lastReason := data.LastTerminationReason + // Cumulative OOM counter so timeseries reflects OOMs beyond the single event. + oomKillCount := sm.getAndUpdateOomCount(ns, pod, container, restartCount, lastReason) + wKind := data.WorkloadKind wName := data.WorkloadName @@ -320,12 +367,6 @@ func (sm *SubscriptionManager) Broadcast( } if matched { - // Populate OomKillCount from termination reason — the proto field - var oomKillCount int64 - if lastReason == collector.ReasonOOMKilled { - oomKillCount = 1 - } - item := &gen.ContainerMetricItem{ Workload: &gen.MpaWorkloadIdentifier{ Namespace: matchedWorkload.Namespace, diff --git a/internal/server/mpa_server_test.go b/internal/server/mpa_server_test.go new file mode 100644 index 00000000..72b0d1c8 --- /dev/null +++ b/internal/server/mpa_server_test.go @@ -0,0 +1,161 @@ +package server + +import ( + "testing" + "time" + + gen "github.com/devzero-inc/zxporter/gen/api/v1" + "github.com/devzero-inc/zxporter/internal/collector" + "github.com/go-logr/logr" +) + +func mustRecv(t *testing.T, ch <-chan *gen.MpaStreamResponse) *gen.MpaStreamResponse { + t.Helper() + select { + case msg := <-ch: + return msg + case <-time.After(1 * time.Second): + t.Fatalf("timed out waiting for stream message") + return nil + } +} + +func getFirstItem(t *testing.T, msg *gen.MpaStreamResponse) *gen.ContainerMetricItem { + t.Helper() + rt := msg.GetRealtimeMetrics() + if rt == nil || len(rt.Items) != 1 { + t.Fatalf("expected 1 realtime item, got %#v", msg) + } + return rt.Items[0] +} + +func TestSubscriptionManager_OomKillCountIsCumulativeAndSticky(t *testing.T) { + sm := NewSubscriptionManager(logr.Discard()) + updates := make(chan *gen.MpaStreamResponse, 10) + clientID := sm.Register(updates) + defer sm.Unregister(clientID) + + // Subscribe to Deployment/foo in ns. + sm.UpdateSubscription(clientID, &gen.MpaWorkloadSubscription{Workloads: []*gen.MpaWorkloadIdentifier{{ + Namespace: "ns", + Name: "foo", + Kind: "Deployment", + }}}) + + base := &collector.ContainerMetricsSnapshot{ + Namespace: "ns", + PodName: "foo-abc", + ContainerName: "c1", + WorkloadKind: "Deployment", + WorkloadName: "foo", + } + + // Initial sample. + { + s := *base + s.RestartCount = 0 + s.LastTerminationReason = "" + sm.Broadcast(&s, time.Now()) + item := getFirstItem(t, mustRecv(t, updates)) + if item.OomKillCount != 0 { + t.Fatalf("expected oom_kill_count=0, got %d", item.OomKillCount) + } + } + + // First OOM edge. + { + s := *base + s.RestartCount = 1 + s.LastTerminationReason = collector.ReasonOOMKilled + sm.Broadcast(&s, time.Now()) + item := getFirstItem(t, mustRecv(t, updates)) + if item.OomKillCount != 1 { + t.Fatalf("expected oom_kill_count=1, got %d", item.OomKillCount) + } + } + + // Subsequent non-OOM sample should retain cumulative count. + { + s := *base + s.RestartCount = 1 + s.LastTerminationReason = "" + sm.Broadcast(&s, time.Now()) + item := getFirstItem(t, mustRecv(t, updates)) + if item.OomKillCount != 1 { + t.Fatalf("expected oom_kill_count=1 (sticky), got %d", item.OomKillCount) + } + } + + // Second OOM edge increments. + { + s := *base + s.RestartCount = 2 + s.LastTerminationReason = collector.ReasonOOMKilled + sm.Broadcast(&s, time.Now()) + item := getFirstItem(t, mustRecv(t, updates)) + if item.OomKillCount != 2 { + t.Fatalf("expected oom_kill_count=2, got %d", item.OomKillCount) + } + } + + // Duplicate OOM with same restartCount must not double-count. + { + s := *base + s.RestartCount = 2 + s.LastTerminationReason = collector.ReasonOOMKilled + sm.Broadcast(&s, time.Now()) + item := getFirstItem(t, mustRecv(t, updates)) + if item.OomKillCount != 2 { + t.Fatalf("expected oom_kill_count=2 (no double count), got %d", item.OomKillCount) + } + } +} + +func TestSubscriptionManager_OomCountSurvivesDroppedSample(t *testing.T) { + sm := NewSubscriptionManager(logr.Discard()) + updates := make(chan *gen.MpaStreamResponse, 1) // small buffer + clientID := sm.Register(updates) + defer sm.Unregister(clientID) + + sm.UpdateSubscription(clientID, &gen.MpaWorkloadSubscription{Workloads: []*gen.MpaWorkloadIdentifier{{ + Namespace: "ns", + Name: "foo", + Kind: "Deployment", + }}}) + + base := &collector.ContainerMetricsSnapshot{ + Namespace: "ns", + PodName: "foo-abc", + ContainerName: "c1", + WorkloadKind: "Deployment", + WorkloadName: "foo", + } + + // Fill channel so next send is dropped. + updates <- &gen.MpaStreamResponse{Payload: &gen.MpaStreamResponse_RealtimeMetrics{RealtimeMetrics: &gen.ContainerMetricsBatch{Items: []*gen.ContainerMetricItem{{ + Workload: &gen.MpaWorkloadIdentifier{Namespace: "ns", Name: "foo", Kind: "Deployment"}, + }}}}} + + // OOM sample is processed (state updated) but dropped due to full channel. + { + s := *base + s.RestartCount = 1 + s.LastTerminationReason = collector.ReasonOOMKilled + sm.Broadcast(&s, time.Now()) + } + + // Drain the pre-filled message. + _ = mustRecv(t, updates) + + // Next normal sample should reflect the cumulative OOM count=1. + { + s := *base + s.RestartCount = 1 + s.LastTerminationReason = "" + sm.Broadcast(&s, time.Now()) + item := getFirstItem(t, mustRecv(t, updates)) + if item.OomKillCount != 1 { + t.Fatalf("expected oom_kill_count=1 after dropped OOM sample, got %d", item.OomKillCount) + } + } +}