Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions internal/server/mpa_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ type SubscriptionManager struct {
mu sync.RWMutex
clients map[string]*ClientSubscription
logger logr.Logger

// OOM state tracking so timeseries can reflect OOM kills cumulatively.
//
// We intentionally track this in the streaming layer so that even if a single
// OOM-bearing sample is dropped due to backpressure, subsequent utilization
// samples can still carry a cumulative count.
oomMu sync.Mutex
oomByKey map[string]oomState
}

type oomState struct {
count int64
lastRestart int64
}

type ClientSubscription struct {
Expand All @@ -195,9 +208,40 @@ type WorkloadInterest struct {

func NewSubscriptionManager(logger logr.Logger) *SubscriptionManager {
return &SubscriptionManager{
clients: make(map[string]*ClientSubscription),
logger: logger.WithName("sub-manager"),
clients: make(map[string]*ClientSubscription),
logger: logger.WithName("sub-manager"),
oomByKey: make(map[string]oomState),
}
}

func oomKey(ns, pod, container string) string {
return ns + "/" + pod + "/" + container
}

// getAndUpdateOomCount updates (if needed) and returns the cumulative OOM kill count.
//
// Increment rule: when we observe OOMKilled AND RestartCount increases beyond the
// last processed restartCount for this key.
func (sm *SubscriptionManager) getAndUpdateOomCount(ns, pod, container string, restartCount int64, lastReason string) int64 {
key := oomKey(ns, pod, container)

sm.oomMu.Lock()
defer sm.oomMu.Unlock()

st := sm.oomByKey[key]

if lastReason == collector.ReasonOOMKilled {
if restartCount > st.lastRestart {
st.count++
st.lastRestart = restartCount
}
} else if restartCount > st.lastRestart {
// Keep lastRestart moving forward even when not OOMKilled.
st.lastRestart = restartCount
}

sm.oomByKey[key] = st
return st.count
}

func (sm *SubscriptionManager) Register(ch chan *gen.MpaStreamResponse) string {
Expand Down Expand Up @@ -287,6 +331,9 @@ func (sm *SubscriptionManager) Broadcast(
restartCount := data.RestartCount
lastReason := data.LastTerminationReason

// Cumulative OOM counter so timeseries reflects OOMs beyond the single event.
oomKillCount := sm.getAndUpdateOomCount(ns, pod, container, restartCount, lastReason)

wKind := data.WorkloadKind
wName := data.WorkloadName

Expand Down Expand Up @@ -320,12 +367,6 @@ func (sm *SubscriptionManager) Broadcast(
}

if matched {
// Populate OomKillCount from termination reason — the proto field
var oomKillCount int64
if lastReason == collector.ReasonOOMKilled {
oomKillCount = 1
}

item := &gen.ContainerMetricItem{
Workload: &gen.MpaWorkloadIdentifier{
Namespace: matchedWorkload.Namespace,
Expand Down
161 changes: 161 additions & 0 deletions internal/server/mpa_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package server

import (
"testing"
"time"

gen "github.com/devzero-inc/zxporter/gen/api/v1"
"github.com/devzero-inc/zxporter/internal/collector"
"github.com/go-logr/logr"
)

func mustRecv(t *testing.T, ch <-chan *gen.MpaStreamResponse) *gen.MpaStreamResponse {
t.Helper()
select {
case msg := <-ch:
return msg
case <-time.After(1 * time.Second):
t.Fatalf("timed out waiting for stream message")
return nil
}
}

func getFirstItem(t *testing.T, msg *gen.MpaStreamResponse) *gen.ContainerMetricItem {
t.Helper()
rt := msg.GetRealtimeMetrics()
if rt == nil || len(rt.Items) != 1 {
t.Fatalf("expected 1 realtime item, got %#v", msg)
}
return rt.Items[0]
}

func TestSubscriptionManager_OomKillCountIsCumulativeAndSticky(t *testing.T) {
sm := NewSubscriptionManager(logr.Discard())
updates := make(chan *gen.MpaStreamResponse, 10)
clientID := sm.Register(updates)
defer sm.Unregister(clientID)

// Subscribe to Deployment/foo in ns.
sm.UpdateSubscription(clientID, &gen.MpaWorkloadSubscription{Workloads: []*gen.MpaWorkloadIdentifier{{
Namespace: "ns",
Name: "foo",
Kind: "Deployment",
}}})

base := &collector.ContainerMetricsSnapshot{
Namespace: "ns",
PodName: "foo-abc",
ContainerName: "c1",
WorkloadKind: "Deployment",
WorkloadName: "foo",
}

// Initial sample.
{
s := *base
s.RestartCount = 0
s.LastTerminationReason = ""
sm.Broadcast(&s, time.Now())
item := getFirstItem(t, mustRecv(t, updates))
if item.OomKillCount != 0 {
t.Fatalf("expected oom_kill_count=0, got %d", item.OomKillCount)
}
}

// First OOM edge.
{
s := *base
s.RestartCount = 1
s.LastTerminationReason = collector.ReasonOOMKilled
sm.Broadcast(&s, time.Now())
item := getFirstItem(t, mustRecv(t, updates))
if item.OomKillCount != 1 {
t.Fatalf("expected oom_kill_count=1, got %d", item.OomKillCount)
}
}

// Subsequent non-OOM sample should retain cumulative count.
{
s := *base
s.RestartCount = 1
s.LastTerminationReason = ""
sm.Broadcast(&s, time.Now())
item := getFirstItem(t, mustRecv(t, updates))
if item.OomKillCount != 1 {
t.Fatalf("expected oom_kill_count=1 (sticky), got %d", item.OomKillCount)
}
}

// Second OOM edge increments.
{
s := *base
s.RestartCount = 2
s.LastTerminationReason = collector.ReasonOOMKilled
sm.Broadcast(&s, time.Now())
item := getFirstItem(t, mustRecv(t, updates))
if item.OomKillCount != 2 {
t.Fatalf("expected oom_kill_count=2, got %d", item.OomKillCount)
}
}

// Duplicate OOM with same restartCount must not double-count.
{
s := *base
s.RestartCount = 2
s.LastTerminationReason = collector.ReasonOOMKilled
sm.Broadcast(&s, time.Now())
item := getFirstItem(t, mustRecv(t, updates))
if item.OomKillCount != 2 {
t.Fatalf("expected oom_kill_count=2 (no double count), got %d", item.OomKillCount)
}
}
}

func TestSubscriptionManager_OomCountSurvivesDroppedSample(t *testing.T) {
sm := NewSubscriptionManager(logr.Discard())
updates := make(chan *gen.MpaStreamResponse, 1) // small buffer
clientID := sm.Register(updates)
defer sm.Unregister(clientID)

sm.UpdateSubscription(clientID, &gen.MpaWorkloadSubscription{Workloads: []*gen.MpaWorkloadIdentifier{{
Namespace: "ns",
Name: "foo",
Kind: "Deployment",
}}})

base := &collector.ContainerMetricsSnapshot{
Namespace: "ns",
PodName: "foo-abc",
ContainerName: "c1",
WorkloadKind: "Deployment",
WorkloadName: "foo",
}

// Fill channel so next send is dropped.
updates <- &gen.MpaStreamResponse{Payload: &gen.MpaStreamResponse_RealtimeMetrics{RealtimeMetrics: &gen.ContainerMetricsBatch{Items: []*gen.ContainerMetricItem{{
Workload: &gen.MpaWorkloadIdentifier{Namespace: "ns", Name: "foo", Kind: "Deployment"},
}}}}}

// OOM sample is processed (state updated) but dropped due to full channel.
{
s := *base
s.RestartCount = 1
s.LastTerminationReason = collector.ReasonOOMKilled
sm.Broadcast(&s, time.Now())
}

// Drain the pre-filled message.
_ = mustRecv(t, updates)

// Next normal sample should reflect the cumulative OOM count=1.
{
s := *base
s.RestartCount = 1
s.LastTerminationReason = ""
sm.Broadcast(&s, time.Now())
item := getFirstItem(t, mustRecv(t, updates))
if item.OomKillCount != 1 {
t.Fatalf("expected oom_kill_count=1 after dropped OOM sample, got %d", item.OomKillCount)
}
}
}
Loading