From 3b71a918c185d22b56d1a81427a14906716ba980 Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Tue, 28 Apr 2026 20:20:11 +0530 Subject: [PATCH 1/2] better oom detection --- .../collector/container_resource_collector.go | 4 +- internal/collector/interface.go | 19 ++ internal/collector/oom_reconciler.go | 247 ++++++++++++++++++ internal/collector/pod_collector.go | 97 ++++++- .../controller/collectionpolicy_controller.go | 84 ++++++ internal/server/mpa_server.go | 8 +- 6 files changed, 452 insertions(+), 7 deletions(-) create mode 100644 internal/collector/oom_reconciler.go diff --git a/internal/collector/container_resource_collector.go b/internal/collector/container_resource_collector.go index 0c9895ee..04c2ebb0 100644 --- a/internal/collector/container_resource_collector.go +++ b/internal/collector/container_resource_collector.go @@ -585,10 +585,10 @@ func (c *ContainerResourceCollector) processContainerMetrics( lastTerminationReason = containerStatus.LastTerminationState.Terminated.Reason // Detect OOM during container init: Kubernetes reports as "StartError" // with message containing "OOM-killed" when memory limit is too low - if lastTerminationReason == "StartError" { + if lastTerminationReason == ReasonStartError { msg := containerStatus.LastTerminationState.Terminated.Message if strings.Contains(strings.ToLower(msg), "oom") { - lastTerminationReason = "OOMKilled" + lastTerminationReason = ReasonOOMKilled } } } diff --git a/internal/collector/interface.go b/internal/collector/interface.go index 320eb0e2..bfc7ab6b 100644 --- a/internal/collector/interface.go +++ b/internal/collector/interface.go @@ -387,3 +387,22 @@ type ResourceCollector interface { // AddResource manually adds a resource to be processed by the collector AddResource(resource interface{}) error } + +// Kubernetes container termination reason constants. Using constants instead of +// raw strings prevents typo-induced silent failures across the OOM detection paths. +const ( + // ReasonOOMKilled is the termination reason kubelet sets when the OOM killer + // terminates a container that exceeded its memory limit. + ReasonOOMKilled = "OOMKilled" + + // ReasonStartError is the termination reason for containers that fail during + // init. When the message contains "oom", it indicates an OOM during startup. + ReasonStartError = "StartError" +) + +// MpaMetricsPublisher is the interface the collector package uses to publish +// metrics directly to the MPA gRPC stream (bypassing the combinedChannel pipeline). +// Implemented by server.MpaServer. +type MpaMetricsPublisher interface { + PublishMetrics(metrics *ContainerMetricsSnapshot, timestamp time.Time) +} diff --git a/internal/collector/oom_reconciler.go b/internal/collector/oom_reconciler.go new file mode 100644 index 00000000..9db89d66 --- /dev/null +++ b/internal/collector/oom_reconciler.go @@ -0,0 +1,247 @@ +package collector + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +const ( + // oomReconcileInterval is how often the reconciler sweeps pods for missed OOM events. + // 30s balances latency (operator gets missed OOMs within half a minute) against + // K8s API load (one List call per namespace per sweep). + oomReconcileInterval = 30 * time.Second + + // oomDeduplicationTTL is how long a seen OOM entry is kept in the dedup map. + // After this, the entry is evicted so a new OOM on the same container (after pod + // recycling) can be detected. 10 minutes covers several reconciliation cycles + // and the operator's emergency response cooldown (default 10s). + oomDeduplicationTTL = 10 * time.Minute +) + +// OOMReconcilerMarker is the interface used by PodCollector to mark OOMs as seen, +// preventing the periodic sweep from re-publishing events already sent via the +// real-time informer path. +type OOMReconcilerMarker interface { + MarkSeen(namespace, podName, containerName string, restartCount int32) +} + +// oomSeenKey uniquely identifies an OOM event for deduplication. +type oomSeenKey struct { + namespace string + podName string + containerName string +} + +type oomSeenEntry struct { + restartCount int32 + seenAt time.Time +} + +// OOMReconciler periodically sweeps pods for OOM termination states that the +// informer-based PodCollector may have missed (informer coalescing, rapid +// restart-then-recovery, zxporter restart). Detected OOMs are published directly +// to the MPA stream, bypassing the lossy combinedChannel pipeline. +type OOMReconciler struct { + client kubernetes.Interface + namespaces []string + mpaPublisher MpaMetricsPublisher + logger logr.Logger + + mu sync.Mutex + seen map[oomSeenKey]oomSeenEntry +} + +// NewOOMReconciler creates a new OOM reconciler. +func NewOOMReconciler( + client kubernetes.Interface, + namespaces []string, + mpaPublisher MpaMetricsPublisher, + logger logr.Logger, +) *OOMReconciler { + return &OOMReconciler{ + client: client, + namespaces: namespaces, + mpaPublisher: mpaPublisher, + logger: logger.WithName("oom-reconciler"), + seen: make(map[oomSeenKey]oomSeenEntry), + } +} + +// MarkSeen records that an OOM event has already been published (by the PodCollector's +// real-time path), so the periodic sweep will skip it. +func (r *OOMReconciler) MarkSeen(namespace, podName, containerName string, restartCount int32) { + r.mu.Lock() + defer r.mu.Unlock() + key := oomSeenKey{namespace: namespace, podName: podName, containerName: containerName} + r.seen[key] = oomSeenEntry{restartCount: restartCount, seenAt: time.Now()} +} + +// Start runs the periodic OOM reconciliation loop. Blocks until ctx is cancelled. +func (r *OOMReconciler) Start(ctx context.Context) { + r.logger.Info("Starting OOM reconciler", "interval", oomReconcileInterval, "namespaces", r.namespaces) + ticker := time.NewTicker(oomReconcileInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + r.logger.Info("OOM reconciler stopped") + return + case <-ticker.C: + r.sweep(ctx) + r.evictStaleEntries() + } + } +} + +// sweep lists pods in all watched namespaces and checks for OOM termination states. +func (r *OOMReconciler) sweep(ctx context.Context) { + namespaces := r.namespaces + // Empty or single empty string means all namespaces + if len(namespaces) == 0 || (len(namespaces) == 1 && namespaces[0] == "") { + namespaces = []string{""} + } + + for _, ns := range namespaces { + if err := r.sweepNamespace(ctx, ns); err != nil { + r.logger.Error(err, "Failed to sweep namespace for OOM events", "namespace", ns) + } + } +} + +// sweepNamespace checks all pods in a single namespace for OOM termination. +func (r *OOMReconciler) sweepNamespace(ctx context.Context, namespace string) error { + listOpts := metav1.ListOptions{} + var podList *corev1.PodList + var err error + + if namespace == "" { + podList, err = r.client.CoreV1().Pods("").List(ctx, listOpts) + } else { + podList, err = r.client.CoreV1().Pods(namespace).List(ctx, listOpts) + } + if err != nil { + return err + } + + for i := range podList.Items { + pod := &podList.Items[i] + r.checkPodForOOM(pod) + } + return nil +} + +// checkPodForOOM inspects container statuses for OOM termination and publishes +// to the MPA stream if the OOM hasn't been seen before. +func (r *OOMReconciler) checkPodForOOM(pod *corev1.Pod) { + for _, cs := range pod.Status.ContainerStatuses { + if !isOOMTermination(cs) { + continue + } + + key := oomSeenKey{ + namespace: pod.Namespace, + podName: pod.Name, + containerName: cs.Name, + } + + r.mu.Lock() + entry, exists := r.seen[key] + alreadySeen := exists && entry.restartCount >= cs.RestartCount + if !alreadySeen { + r.seen[key] = oomSeenEntry{restartCount: cs.RestartCount, seenAt: time.Now()} + } + r.mu.Unlock() + + if alreadySeen { + continue + } + + r.publishOOM(pod, cs) + } +} + +// isOOMTermination returns true if the container's last termination was OOM-related. +func isOOMTermination(cs corev1.ContainerStatus) bool { + terminated := cs.LastTerminationState.Terminated + if terminated == nil { + return false + } + if terminated.Reason == ReasonOOMKilled { + return true + } + // Kubernetes reports init-container OOM as StartError with "oom" in message + if terminated.Reason == ReasonStartError && strings.Contains(strings.ToLower(terminated.Message), "oom") { + return true + } + return false +} + +// publishOOM sends a synthetic OOM metric snapshot directly to the MPA stream. +func (r *OOMReconciler) publishOOM(pod *corev1.Pod, cs corev1.ContainerStatus) { + if r.mpaPublisher == nil { + return + } + + workloadName, workloadKind := getWorkloadInfo(pod) + requestBytes, limitBytes := getContainerResources(pod, cs.Name) + + var cpuRequestMillis, cpuLimitMillis int64 + for _, c := range pod.Spec.Containers { + if c.Name == cs.Name { + if req := c.Resources.Requests.Cpu(); req != nil { + cpuRequestMillis = req.MilliValue() + } + if lim := c.Resources.Limits.Cpu(); lim != nil { + cpuLimitMillis = lim.MilliValue() + } + break + } + } + + snapshot := &ContainerMetricsSnapshot{ + ContainerName: cs.Name, + PodName: pod.Name, + Namespace: pod.Namespace, + NodeName: pod.Spec.NodeName, + WorkloadName: workloadName, + WorkloadKind: workloadKind, + CpuRequestMillis: cpuRequestMillis, + CpuLimitMillis: cpuLimitMillis, + MemoryUsageBytes: limitBytes, + MemoryRequestBytes: requestBytes, + MemoryLimitBytes: limitBytes, + PodLabels: pod.Labels, + ContainerRunning: cs.State.Running != nil, + ContainerRestarts: cs.RestartCount > 0, + RestartCount: int64(cs.RestartCount), + LastTerminationReason: ReasonOOMKilled, + } + + r.mpaPublisher.PublishMetrics(snapshot, time.Now()) + + r.logger.Info("OOM reconciler: published missed OOM event to MPA stream", + "namespace", pod.Namespace, + "pod", pod.Name, + "container", cs.Name, + "restartCount", cs.RestartCount) +} + +// evictStaleEntries removes dedup entries older than oomDeduplicationTTL. +func (r *OOMReconciler) evictStaleEntries() { + r.mu.Lock() + defer r.mu.Unlock() + cutoff := time.Now().Add(-oomDeduplicationTTL) + for key, entry := range r.seen { + if entry.seenAt.Before(cutoff) { + delete(r.seen, key) + } + } +} diff --git a/internal/collector/pod_collector.go b/internal/collector/pod_collector.go index 3ec1f5ff..d614824a 100644 --- a/internal/collector/pod_collector.go +++ b/internal/collector/pod_collector.go @@ -61,6 +61,14 @@ type PodCollector struct { // Startup lifecycle tracking startupTracker map[startupLifecycleKey]*startupLifecycleEntry startupTrackerMu sync.Mutex + + // MPA stream publisher for direct OOM event delivery (bypasses combinedChannel). + // Optional — nil when MPA server is not running. + mpaPublisher MpaMetricsPublisher + + // OOM reconciler for cross-path deduplication. When set, the PodCollector + // marks OOMs as seen so the periodic sweep doesn't re-publish them. + oomReconciler OOMReconcilerMarker } // NewPodCollector creates a new collector for pod resources @@ -276,8 +284,8 @@ func (c *PodCollector) checkForContainerEvents(oldPod, newPod *corev1.Pod) { // Check if the restart was due to OOM terminated := newStatus.LastTerminationState.Terminated - isOOM := terminated != nil && (terminated.Reason == "OOMKilled" || - (terminated.Reason == "StartError" && strings.Contains(strings.ToLower(terminated.Message), "oom"))) + isOOM := terminated != nil && (terminated.Reason == ReasonOOMKilled || + (terminated.Reason == ReasonStartError && strings.Contains(strings.ToLower(terminated.Message), "oom"))) if isOOM { // reason = "containerOOMKilled" c.logger.Info("Container OOM killed", @@ -312,8 +320,16 @@ func (c *PodCollector) checkForContainerEvents(oldPod, newPod *corev1.Pod) { ) } - // Emit structured OOM event for direct path + // Emit structured OOM event for direct path (backend) c.emitContainerOOMEvent(newPod, newStatus) + + // Publish OOM directly to MPA stream (operator) — bypasses lossy pipeline + c.logger.Info("OOM detected by informer, publishing to MPA stream (direct path)", + "namespace", newPod.Namespace, + "pod", newPod.Name, + "container", newStatus.Name, + "restartCount", newStatus.RestartCount) + c.publishOOMToMpaStream(newPod, newStatus) } c.sendContainerEvent(newPod, newStatus.Name, EventTypeContainerRestarted, &newStatus) @@ -515,6 +531,79 @@ func (c *PodCollector) emitContainerOOMEvent(pod *corev1.Pod, status corev1.Cont } } +// SetMpaPublisher sets the MPA stream publisher for direct OOM event delivery. +// Called by the controller after MPA server setup. +func (c *PodCollector) SetMpaPublisher(publisher MpaMetricsPublisher) { + c.mpaPublisher = publisher +} + +// SetOOMReconciler sets the OOM reconciler for cross-path deduplication. +func (c *PodCollector) SetOOMReconciler(reconciler OOMReconcilerMarker) { + c.oomReconciler = reconciler +} + +// publishOOMToMpaStream sends an OOM event directly to the MPA gRPC stream, +// bypassing the combinedChannel pipeline that can silently drop events. +// This gives the dakr operator a real-time OOM signal (~1-2s latency). +func (c *PodCollector) publishOOMToMpaStream(pod *corev1.Pod, status corev1.ContainerStatus) { + if c.mpaPublisher == nil { + return + } + + workloadName, workloadKind := getWorkloadInfo(pod) + requestBytes, limitBytes := getContainerResources(pod, status.Name) + + // Build CPU request/limit from pod spec + var cpuRequestMillis, cpuLimitMillis int64 + for _, cs := range pod.Spec.Containers { + if cs.Name == status.Name { + if req := cs.Resources.Requests.Cpu(); req != nil { + cpuRequestMillis = req.MilliValue() + } + if lim := cs.Resources.Limits.Cpu(); lim != nil { + cpuLimitMillis = lim.MilliValue() + } + break + } + } + + snapshot := &ContainerMetricsSnapshot{ + ContainerName: status.Name, + PodName: pod.Name, + Namespace: pod.Namespace, + NodeName: pod.Spec.NodeName, + WorkloadName: workloadName, + WorkloadKind: workloadKind, + CpuRequestMillis: cpuRequestMillis, + CpuLimitMillis: cpuLimitMillis, + MemoryUsageBytes: limitBytes, // OOM means usage >= limit + MemoryRequestBytes: requestBytes, + MemoryLimitBytes: limitBytes, + PodLabels: pod.Labels, + ContainerRunning: status.State.Running != nil, + ContainerRestarts: status.RestartCount > 0, + RestartCount: int64(status.RestartCount), + LastTerminationReason: ReasonOOMKilled, + } + + c.mpaPublisher.PublishMetrics(snapshot, time.Now()) + + // Mark as seen in the reconciler so the periodic sweep doesn't re-publish + if c.oomReconciler != nil { + c.oomReconciler.MarkSeen(pod.Namespace, pod.Name, status.Name, status.RestartCount) + } + + c.logger.Info("OOM event published to MPA stream via direct path", + "namespace", pod.Namespace, + "pod", pod.Name, + "container", status.Name, + "restartCount", status.RestartCount, + "memoryLimitBytes", snapshot.MemoryLimitBytes, + "memoryRequestBytes", snapshot.MemoryRequestBytes, + "workloadName", snapshot.WorkloadName, + "workloadKind", snapshot.WorkloadKind) +} + // emitContainerCrashLoopEvent sends a structured CrashLoopBackOff event through the batch channel. func (c *PodCollector) emitContainerCrashLoopEvent(pod *corev1.Pod, status corev1.ContainerStatus) { workloadName, workloadKind := getWorkloadInfo(pod) @@ -525,7 +614,7 @@ func (c *PodCollector) emitContainerCrashLoopEvent(pod *corev1.Pod, status corev if status.LastTerminationState.Terminated != nil { lastTerminationReason = status.LastTerminationState.Terminated.Reason exitCode = status.LastTerminationState.Terminated.ExitCode - isOOMRelated = lastTerminationReason == "OOMKilled" + isOOMRelated = lastTerminationReason == ReasonOOMKilled } c.logger.Info("Container CrashLoopBackOff detected", diff --git a/internal/controller/collectionpolicy_controller.go b/internal/controller/collectionpolicy_controller.go index 223ba591..e8367c8a 100644 --- a/internal/controller/collectionpolicy_controller.go +++ b/internal/controller/collectionpolicy_controller.go @@ -83,6 +83,10 @@ type CollectionPolicyReconciler struct { // and should be retried periodically pendingCollectors []pendingCollector pendingMu sync.Mutex + + // OOM reconciler for periodic sweep of missed OOM events + oomReconciler *collector.OOMReconciler + oomReconcilerCancel context.CancelFunc } // pendingCollector represents a collector that was unavailable at registration time @@ -1811,6 +1815,12 @@ func (r *CollectionPolicyReconciler) restartCollectors( }, ) logger.Info("Successfully restarted collector", "type", collectorType) + + // Re-wire MPA publisher and OOM reconciler into replaced PodCollector + if collectorType == "pod" { + r.wireMpaPublisherIntoPodCollector() + r.wireOOMReconcilerIntoPodCollector() + } } } @@ -1905,6 +1915,10 @@ func (r *CollectionPolicyReconciler) initializeCollectors( // Not fatal } + // Start OOM reconciler and wire MPA publisher into PodCollector. + // Both depend on MpaServer being set up first. + r.setupOOMDetection(config) + r.TelemetryLogger.Report( gen.LogLevel_LOG_LEVEL_INFO, "CollectionPolicyReconciler_initializeCollectors", @@ -2056,6 +2070,69 @@ func (r *CollectionPolicyReconciler) setupMpaServer() error { return r.MpaServer.Start(r.MpaServerPort) } +// setupOOMDetection starts the OOM reconciler and wires the MPA publisher +// into the PodCollector so OOM events reach the operator via both the real-time +// informer path and the periodic sweep fallback. +func (r *CollectionPolicyReconciler) setupOOMDetection(config *PolicyConfig) { + if r.MpaServer == nil { + return + } + + // Wire MPA publisher into PodCollector (if it's running) so the real-time + // informer path can publish OOM events directly to the operator. + if r.CollectionManager != nil { + r.wireMpaPublisherIntoPodCollector() + } + + // Start periodic OOM reconciler + if r.oomReconciler == nil { + namespaces := config.TargetNamespaces + r.oomReconciler = collector.NewOOMReconciler( + r.K8sClient, + namespaces, + r.MpaServer, + r.Log, + ) + + // Also wire the reconciler into PodCollector for dedup + r.wireOOMReconcilerIntoPodCollector() + + ctx, cancel := context.WithCancel(context.Background()) + r.oomReconcilerCancel = cancel + go r.oomReconciler.Start(ctx) + r.Log.Info("OOM reconciler started") + } +} + +// wireMpaPublisherIntoPodCollector sets the MPA publisher on the PodCollector +// so real-time OOM events are published directly to the operator's gRPC stream. +func (r *CollectionPolicyReconciler) wireMpaPublisherIntoPodCollector() { + rc := r.CollectionManager.GetCollector("pod") + if rc == nil { + return + } + if pc, ok := rc.(*collector.PodCollector); ok { + pc.SetMpaPublisher(r.MpaServer) + r.Log.Info("Wired MPA publisher into PodCollector for direct OOM delivery") + } +} + +// wireOOMReconcilerIntoPodCollector sets the OOM reconciler on the PodCollector +// for cross-path deduplication (informer path marks OOMs as seen so sweep skips them). +func (r *CollectionPolicyReconciler) wireOOMReconcilerIntoPodCollector() { + if r.CollectionManager == nil || r.oomReconciler == nil { + return + } + rc := r.CollectionManager.GetCollector("pod") + if rc == nil { + return + } + if pc, ok := rc.(*collector.PodCollector); ok { + pc.SetOOMReconciler(r.oomReconciler) + r.Log.Info("Wired OOM reconciler into PodCollector for deduplication") + } +} + // setupClusterCollector creates and starts just the cluster collector func (r *CollectionPolicyReconciler) setupClusterCollector( ctx context.Context, @@ -2294,6 +2371,13 @@ func (r *CollectionPolicyReconciler) cleanupOnFailure(logger logr.Logger) { r.MpaServer.Stop() } + // Stop OOM reconciler + if r.oomReconcilerCancel != nil { + r.oomReconcilerCancel() + r.oomReconcilerCancel = nil + r.oomReconciler = nil + } + // Reset state r.IsRunning = false r.CollectionManager = nil diff --git a/internal/server/mpa_server.go b/internal/server/mpa_server.go index 1b700b3a..50d1a957 100644 --- a/internal/server/mpa_server.go +++ b/internal/server/mpa_server.go @@ -320,6 +320,12 @@ func (sm *SubscriptionManager) Broadcast( } if matched { + // Populate OomKillCount from termination reason — the proto field + var oomKillCount int64 + if lastReason == collector.ReasonOOMKilled { + oomKillCount = 1 + } + item := &gen.ContainerMetricItem{ Workload: &gen.MpaWorkloadIdentifier{ Namespace: matchedWorkload.Namespace, @@ -332,7 +338,7 @@ func (sm *SubscriptionManager) Broadcast( Timestamp: timestamppb.New(timestamp), CpuUsageMillis: cpuMillis, MemoryUsageBytes: memBytes, - OomKillCount: 0, // Not explicitly tracked in snapshot yet + OomKillCount: oomKillCount, RestartCount: int32(restartCount), LastTerminationReason: lastReason, // Resource requests and limits for utilization calculation From 4257767f49df0a70846b1e17677a30780a8e644b Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Tue, 28 Apr 2026 20:33:58 +0530 Subject: [PATCH 2/2] fix data race on mpaPublisher/oomReconciler --- internal/collector/container_metrics.go | 42 +++++++++++++ internal/collector/oom_reconciler.go | 83 ++++++++++--------------- internal/collector/pod_collector.go | 59 ++++++------------ 3 files changed, 93 insertions(+), 91 deletions(-) diff --git a/internal/collector/container_metrics.go b/internal/collector/container_metrics.go index 9b18fb1c..7a35e0df 100644 --- a/internal/collector/container_metrics.go +++ b/internal/collector/container_metrics.go @@ -1,5 +1,7 @@ package collector +import corev1 "k8s.io/api/core/v1" + // ContainerMetricsSnapshot represents a strongly-typed snapshot of container resource metrics type ContainerMetricsSnapshot struct { // Container identification @@ -75,3 +77,43 @@ type ContainerMetricsSnapshot struct { GpuTotalMemoryMb interface{} `json:"gpuTotalMemoryMb,omitempty"` IndividualGPUMetrics string `json:"individualGPUMetrics,omitempty"` // JSON string } + +// BuildOOMSnapshot constructs a ContainerMetricsSnapshot for an OOM event. +// Used by both the PodCollector (informer fast path) and OOMReconciler (sweep path) +// to ensure consistent snapshot construction. +func BuildOOMSnapshot(pod *corev1.Pod, cs corev1.ContainerStatus) *ContainerMetricsSnapshot { + workloadName, workloadKind := getWorkloadInfo(pod) + requestBytes, limitBytes := getContainerResources(pod, cs.Name) + + var cpuRequestMillis, cpuLimitMillis int64 + for _, c := range pod.Spec.Containers { + if c.Name == cs.Name { + if req := c.Resources.Requests.Cpu(); req != nil { + cpuRequestMillis = req.MilliValue() + } + if lim := c.Resources.Limits.Cpu(); lim != nil { + cpuLimitMillis = lim.MilliValue() + } + break + } + } + + return &ContainerMetricsSnapshot{ + ContainerName: cs.Name, + PodName: pod.Name, + Namespace: pod.Namespace, + NodeName: pod.Spec.NodeName, + WorkloadName: workloadName, + WorkloadKind: workloadKind, + CpuRequestMillis: cpuRequestMillis, + CpuLimitMillis: cpuLimitMillis, + MemoryUsageBytes: limitBytes, // OOM means usage >= limit + MemoryRequestBytes: requestBytes, + MemoryLimitBytes: limitBytes, + PodLabels: pod.Labels, + ContainerRunning: cs.State.Running != nil, + ContainerRestarts: cs.RestartCount > 0, + RestartCount: int64(cs.RestartCount), + LastTerminationReason: ReasonOOMKilled, + } +} diff --git a/internal/collector/oom_reconciler.go b/internal/collector/oom_reconciler.go index 9db89d66..9a3e80f0 100644 --- a/internal/collector/oom_reconciler.go +++ b/internal/collector/oom_reconciler.go @@ -117,23 +117,34 @@ func (r *OOMReconciler) sweep(ctx context.Context) { } // sweepNamespace checks all pods in a single namespace for OOM termination. +// Uses chunked listing (500 pods per page) to bound memory usage on large clusters. func (r *OOMReconciler) sweepNamespace(ctx context.Context, namespace string) error { - listOpts := metav1.ListOptions{} - var podList *corev1.PodList - var err error - - if namespace == "" { - podList, err = r.client.CoreV1().Pods("").List(ctx, listOpts) - } else { - podList, err = r.client.CoreV1().Pods(namespace).List(ctx, listOpts) - } - if err != nil { - return err - } + continueToken := "" + for { + listOpts := metav1.ListOptions{ + Limit: 500, + Continue: continueToken, + } + + var podList *corev1.PodList + var err error + if namespace == "" { + podList, err = r.client.CoreV1().Pods("").List(ctx, listOpts) + } else { + podList, err = r.client.CoreV1().Pods(namespace).List(ctx, listOpts) + } + if err != nil { + return err + } + + for i := range podList.Items { + r.checkPodForOOM(&podList.Items[i]) + } - for i := range podList.Items { - pod := &podList.Items[i] - r.checkPodForOOM(pod) + continueToken = podList.Continue + if continueToken == "" { + break + } } return nil } @@ -190,48 +201,18 @@ func (r *OOMReconciler) publishOOM(pod *corev1.Pod, cs corev1.ContainerStatus) { return } - workloadName, workloadKind := getWorkloadInfo(pod) - requestBytes, limitBytes := getContainerResources(pod, cs.Name) - - var cpuRequestMillis, cpuLimitMillis int64 - for _, c := range pod.Spec.Containers { - if c.Name == cs.Name { - if req := c.Resources.Requests.Cpu(); req != nil { - cpuRequestMillis = req.MilliValue() - } - if lim := c.Resources.Limits.Cpu(); lim != nil { - cpuLimitMillis = lim.MilliValue() - } - break - } - } - - snapshot := &ContainerMetricsSnapshot{ - ContainerName: cs.Name, - PodName: pod.Name, - Namespace: pod.Namespace, - NodeName: pod.Spec.NodeName, - WorkloadName: workloadName, - WorkloadKind: workloadKind, - CpuRequestMillis: cpuRequestMillis, - CpuLimitMillis: cpuLimitMillis, - MemoryUsageBytes: limitBytes, - MemoryRequestBytes: requestBytes, - MemoryLimitBytes: limitBytes, - PodLabels: pod.Labels, - ContainerRunning: cs.State.Running != nil, - ContainerRestarts: cs.RestartCount > 0, - RestartCount: int64(cs.RestartCount), - LastTerminationReason: ReasonOOMKilled, - } - + snapshot := BuildOOMSnapshot(pod, cs) r.mpaPublisher.PublishMetrics(snapshot, time.Now()) r.logger.Info("OOM reconciler: published missed OOM event to MPA stream", "namespace", pod.Namespace, "pod", pod.Name, "container", cs.Name, - "restartCount", cs.RestartCount) + "restartCount", cs.RestartCount, + "memoryLimitBytes", snapshot.MemoryLimitBytes, + "memoryRequestBytes", snapshot.MemoryRequestBytes, + "workloadName", snapshot.WorkloadName, + "workloadKind", snapshot.WorkloadKind) } // evictStaleEntries removes dedup entries older than oomDeduplicationTTL. diff --git a/internal/collector/pod_collector.go b/internal/collector/pod_collector.go index d614824a..95605ab1 100644 --- a/internal/collector/pod_collector.go +++ b/internal/collector/pod_collector.go @@ -532,13 +532,19 @@ func (c *PodCollector) emitContainerOOMEvent(pod *corev1.Pod, status corev1.Cont } // SetMpaPublisher sets the MPA stream publisher for direct OOM event delivery. -// Called by the controller after MPA server setup. +// Called by the controller after MPA server setup. Guarded by mu because the +// informer goroutine reads mpaPublisher concurrently in publishOOMToMpaStream. func (c *PodCollector) SetMpaPublisher(publisher MpaMetricsPublisher) { + c.mu.Lock() + defer c.mu.Unlock() c.mpaPublisher = publisher } // SetOOMReconciler sets the OOM reconciler for cross-path deduplication. +// Guarded by mu for the same reason as SetMpaPublisher. func (c *PodCollector) SetOOMReconciler(reconciler OOMReconcilerMarker) { + c.mu.Lock() + defer c.mu.Unlock() c.oomReconciler = reconciler } @@ -546,51 +552,24 @@ func (c *PodCollector) SetOOMReconciler(reconciler OOMReconcilerMarker) { // bypassing the combinedChannel pipeline that can silently drop events. // This gives the dakr operator a real-time OOM signal (~1-2s latency). func (c *PodCollector) publishOOMToMpaStream(pod *corev1.Pod, status corev1.ContainerStatus) { - if c.mpaPublisher == nil { - return - } - - workloadName, workloadKind := getWorkloadInfo(pod) - requestBytes, limitBytes := getContainerResources(pod, status.Name) + // Snapshot the publisher and reconciler under the read lock so the + // actual publish (which may block on the client channel) runs unlocked. + c.mu.RLock() + publisher := c.mpaPublisher + reconciler := c.oomReconciler + c.mu.RUnlock() - // Build CPU request/limit from pod spec - var cpuRequestMillis, cpuLimitMillis int64 - for _, cs := range pod.Spec.Containers { - if cs.Name == status.Name { - if req := cs.Resources.Requests.Cpu(); req != nil { - cpuRequestMillis = req.MilliValue() - } - if lim := cs.Resources.Limits.Cpu(); lim != nil { - cpuLimitMillis = lim.MilliValue() - } - break - } + if publisher == nil { + return } - snapshot := &ContainerMetricsSnapshot{ - ContainerName: status.Name, - PodName: pod.Name, - Namespace: pod.Namespace, - NodeName: pod.Spec.NodeName, - WorkloadName: workloadName, - WorkloadKind: workloadKind, - CpuRequestMillis: cpuRequestMillis, - CpuLimitMillis: cpuLimitMillis, - MemoryUsageBytes: limitBytes, // OOM means usage >= limit - MemoryRequestBytes: requestBytes, - MemoryLimitBytes: limitBytes, - PodLabels: pod.Labels, - ContainerRunning: status.State.Running != nil, - ContainerRestarts: status.RestartCount > 0, - RestartCount: int64(status.RestartCount), - LastTerminationReason: ReasonOOMKilled, - } + snapshot := BuildOOMSnapshot(pod, status) - c.mpaPublisher.PublishMetrics(snapshot, time.Now()) + publisher.PublishMetrics(snapshot, time.Now()) // Mark as seen in the reconciler so the periodic sweep doesn't re-publish - if c.oomReconciler != nil { - c.oomReconciler.MarkSeen(pod.Namespace, pod.Name, status.Name, status.RestartCount) + if reconciler != nil { + reconciler.MarkSeen(pod.Namespace, pod.Name, status.Name, status.RestartCount) } c.logger.Info("OOM event published to MPA stream via direct path",