MPA v2 - rules engine and in cluster autoscaling#268
Conversation
📝 WalkthroughWalkthroughAdds WorkloadRecommendation CRD support and collector, a Prometheus-based historical metrics collector, proto/service changes to envelope realtime and historical payloads, pod-level network metric flags, RBAC updates for dakr.devzero.io, controller retry for CRD-dependent collectors, and telemetry for dropped messages. Changes
Sequence Diagram(s)sequenceDiagram
participant CRD as Cluster CRD
participant WRC as WorkloadRecommendationCollector
participant BATCH as Batcher
participant CTRL as CollectionPolicyReconciler
CRD->>WRC: Add/Update/Delete WorkloadRecommendation
WRC->>WRC: Filter terminal phases & detect meaningful change
WRC->>BATCH: Enqueue CollectedResource (meta + event)
BATCH->>CTRL: Deliver batch for processing
CTRL->>CTRL: Reconcile / register collectors (may defer)
sequenceDiagram
participant Client as Client (subscription)
participant MPA as MpaServer
participant SUB as SubscriptionManager
participant HIST as HistoricalMetricsCollector
participant PROM as Prometheus
participant BATCH as Metrics Batcher
Client->>MPA: Subscribe workloads
MPA->>SUB: Register subscription channel
BATCH->>MPA: Realtime metrics batch
MPA->>SUB: Broadcast MpaStreamResponse(realtime_metrics)
SUB->>Client: Deliver realtime metrics
Note over MPA: Periodic / on-subscription historical refresh
MPA->>HIST: FetchPercentilesForAll(interests)
HIST->>PROM: Run percentile queries (24h)
PROM-->>HIST: Return metric values
HIST-->>MPA: HistoricalMetricsSummary
MPA->>SUB: Broadcast MpaStreamResponse(historical_summary)
SUB->>Client: Deliver historical summary
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/controller/collectionpolicy_controller.go (1)
3596-3601:⚠️ Potential issue | 🟡 MinorMissing
workload_recommendationcase inhandleDisabledCollectorsChange.The
workload_recommendationcollector is added toregisterResourceCollectorsandrestartCollectors, but it's missing fromhandleDisabledCollectorsChange. If a user enables/disables this collector via config, it won't be properly handled.🐛 Proposed fix
Add the case before the
defaultclause:+ case "workload_recommendation": + replacedCollector = collector.NewWorkloadRecommendationCollector( + r.DynamicClient, + newConfig.TargetNamespaces, + collector.DefaultMaxBatchSize, + collector.DefaultMaxBatchTime, + logger, + r.TelemetryLogger, + ) default: logger.Info("Unknown collector type, skipping", "type", collectorType) continue
🤖 Fix all issues with AI agents
In `@internal/collector/historical_metrics_collector.go`:
- Around line 119-122: PromQL label values (namespace, containerName) are being
interpolated directly into queries (see variables namespace, podRegex,
containerName in historical_metrics_collector.go) creating an injection risk;
add an escapePromQLLabel helper (escape backslashes and double quotes using
strings.ReplaceAll) and use it when formatting queries that insert namespace and
containerName, while leaving podRegex unescaped if it is a deliberate regex, and
update all query constructions (the instances at the shown ranges) to call
escapePromQLLabel(workload.Namespace) and escapePromQLLabel(containerName)
before interpolation.
- Around line 143-179: The percentiles list currently omits 0.95 so CpuP95 and
MemP95 in ContainerHistoricalMetrics stay zero; update the percentiles slice
(variable percentiles) to include 0.95 (e.g., []float64{0.50, 0.75, 0.80, 0.90,
0.95, 0.99}) and ensure when you populate the proto from cpuValues and memValues
you read the 0.95 entries (cpuValues[0.95], memValues[0.95]) to set CpuP95 and
MemP95 respectively (verify code that builds the ContainerHistoricalMetrics
struct uses those keys).
In `@internal/collector/workload_recommendation_collector.go`:
- Around line 127-130: The DeleteFunc uses an unchecked type assertion on obj to
*unstructured.Unstructured which can panic for tombstone objects; update the
DeleteFunc to detect and unwrap a cache.DeletedFinalStateUnknown (or similar
tombstone) before asserting, retrieving the underlying object when present, and
only then call handleWorkloadRecommendationEvent(wr, EventTypeDelete); ensure
you handle cases where the tombstone contains a non-Unstructured object by
logging/ignoring rather than panicking so that handleWorkloadRecommendationEvent
is only invoked with a valid *unstructured.Unstructured.
- Around line 104-107: AddFunc currently does an unchecked type assertion on obj
to *unstructured.Unstructured which can panic; update AddFunc to use the safe
assertion pattern (wr, ok := obj.(*unstructured.Unstructured)) and if ok is
false, log a warning or error and return early instead of calling
handleWorkloadRecommendationEvent. Mirror the defensive check used in DeleteFunc
so handleWorkloadRecommendationEvent is only called with a valid
*unstructured.Unstructured.
- Around line 278-283: There is a race where Stop() closes c.batchChan while
handleWorkloadRecommendationEvent may still write to it; add a stop/closed
indicator (e.g., an atomic.Bool or mutex-protected bool) on the collector
struct, set it in Stop() before closing the channel, and have
handleWorkloadRecommendationEvent check that flag (return early if stopped)
before attempting to send to c.batchChan; ensure Stop() sets the flag, waits for
any goroutines to finish (or use a WaitGroup) then closes c.batchChan and nils
it to avoid races when writing to batchChan.
🧹 Nitpick comments (7)
proto/api/v1/mpa.proto (1)
76-88: Inconsistent field ordering and numbering for percentiles.The
cpu_p95(field 12) andmem_p95(field 13) are placed after their respectivep99fields (6 and 11), making the schema harder to read. While this doesn't affect wire compatibility, consider reordering for clarity in future schema iterations.Also, CPU and memory have p75 percentiles but network and disk percentiles skip p75 (going from p50 directly to p80). If this asymmetry is intentional, consider adding a comment explaining why.
internal/collector/workload_recommendation_collector.go (2)
34-35: Unusedmufield.The
mu sync.RWMutexfield is declared but never used in the collector. Remove it if synchronization is not needed, or add proper locking if concurrent access to collector state is possible.
250-254: UID comparison is redundant.If
oldWR.GetUID() != newWR.GetUID(), these are fundamentally different objects (recreated resource). This would already be caught by spec/status comparisons or handled as a new Add event. Consider if this check adds value.internal/controller/collectionpolicy_controller.go (1)
345-366: Collector instance created at registration may leak if unavailable.When a collector is added to
pendingCollectors, a new instance is created viapc.factory()on each retry (line 346). However, the original collector instance created inregisterResourceCollectors(stored inc.collector) is never cleaned up when it's not available. This could lead to resource leaks (channels, goroutines) if the collector constructor allocates resources.Additionally, if registration succeeds but
StartCollectorfails (line 355-358), the collector isn't re-added to the pending list for retry.♻️ Consider tracking start failures
if err := r.CollectionManager.RegisterCollector(newCollector); err != nil { logger.Error(err, "Failed to register collector", "type", pc.name.String()) stillPending = append(stillPending, pc) continue } if err := r.CollectionManager.StartCollector(ctx, pc.name.String()); err != nil { logger.Error(err, "Failed to start collector", "type", pc.name.String()) - continue + // Deregister and retry later + _ = r.CollectionManager.DeregisterCollector(pc.name.String()) + stillPending = append(stillPending, pc) + continue }internal/server/mpa_server.go (2)
310-333: Pod regex pattern may be too broad.The pattern
interest.Name + "-.*"could match unintended pods. For example, a deployment namedwebwould match bothweb-abc123andweb-app-xyz123pods (if another deploymentweb-appexists).Consider using a more precise pattern or leveraging the workload's label selector for filtering:
♻️ Suggested improvement
- podRegex := interest.Name + "-.*" + // Use pattern that matches typical k8s pod naming: deployment-replicaset_hash-pod_hash + // This is still heuristic; consider using label selectors from MatchLabels if available + podRegex := fmt.Sprintf("^%s-[a-z0-9]+-[a-z0-9]+$", regexp.QuoteMeta(interest.Name))Alternatively, if the workload has
MatchLabels, use those for more precise filtering in the Prometheus query.
160-170: Client ID generation could collide under high concurrency.Using
time.Now().UnixNano()for client IDs could theoretically produce duplicates if multiple clients connect within the same nanosecond (especially on systems with coarse clock resolution).♻️ Consider using UUID or atomic counter
+import "github.com/google/uuid" func (sm *SubscriptionManager) Register(ch chan *gen.MpaStreamResponse) string { sm.mu.Lock() defer sm.mu.Unlock() - id := fmt.Sprintf("%d", time.Now().UnixNano()) + id := uuid.New().String() sm.clients[id] = &ClientSubscription{Or use an atomic counter if UUIDs add unwanted dependency.
internal/collector/historical_metrics_collector.go (1)
48-82:FetchPercentilesalways returns nil error but signature suggests it can fail.The function signature returns
(*gen.HistoricalMetricsSummary, error)but always returnsnilfor the error, even when all container percentile fetches fail. Callers (likeFetchPercentilesForAllat line 100) check for errors that will never occur.Consider returning an error if no container data was successfully fetched, or change the signature to not return an error.
♻️ Proposed fix to return error on complete failure
for _, containerName := range workload.Containers { metrics, samples, err := c.fetchContainerPercentiles(ctx, workload, containerName, now) if err != nil { c.logger.Error(err, "Failed to fetch percentiles for container", "container", containerName, "workload", workload.WorkloadName, ) continue } containerResults = append(containerResults, metrics) if samples > totalSamples { totalSamples = samples } } + if len(containerResults) == 0 && len(workload.Containers) > 0 { + return nil, fmt.Errorf("failed to fetch percentiles for any container in workload %s", workload.WorkloadName) + } + return &gen.HistoricalMetricsSummary{
| query := fmt.Sprintf( | ||
| `group by (container) (container_memory_working_set_bytes{namespace="%s", pod=~"%s", container!="", container!="POD"})`, | ||
| namespace, podRegex, | ||
| ) |
There was a problem hiding this comment.
PromQL injection risk from unescaped user input.
The namespace, podRegex, and containerName values are interpolated directly into PromQL queries without escaping. While these values typically come from Kubernetes resources (which have naming constraints), a malicious or malformed value could break queries or cause unexpected behavior.
🛡️ Consider escaping label values
import "regexp"
func escapePromQLLabel(s string) string {
// Escape backslashes and double quotes for PromQL string literals
s = strings.ReplaceAll(s, `\`, `\\`)
s = strings.ReplaceAll(s, `"`, `\"`)
return s
}
// Usage:
query := fmt.Sprintf(
`quantile_over_time(%.2f, rate(container_cpu_usage_seconds_total{namespace="%s", pod=~"%s", container="%s"}[5m])[24h:%s])`,
p, escapePromQLLabel(workload.Namespace), workload.PodRegex, escapePromQLLabel(containerName), historicalStepInterval,
)Note: PodRegex intentionally contains regex metacharacters so should not be escaped, but ensure it's constructed safely upstream.
Also applies to: 152-155, 167-170, 182-185
🤖 Prompt for AI Agents
In `@internal/collector/historical_metrics_collector.go` around lines 119 - 122,
PromQL label values (namespace, containerName) are being interpolated directly
into queries (see variables namespace, podRegex, containerName in
historical_metrics_collector.go) creating an injection risk; add an
escapePromQLLabel helper (escape backslashes and double quotes using
strings.ReplaceAll) and use it when formatting queries that insert namespace and
containerName, while leaving podRegex unescaped if it is a deliberate regex, and
update all query constructions (the instances at the shown ranges) to call
escapePromQLLabel(workload.Namespace) and escapePromQLLabel(containerName)
before interpolation.
| percentiles := []float64{0.50, 0.75, 0.80, 0.90, 0.99} | ||
|
|
||
| cpuValues := make(map[float64]int64) | ||
| memValues := make(map[float64]int64) | ||
|
|
||
| var sampleCount int32 | ||
|
|
||
| for _, p := range percentiles { | ||
| // CPU query: rate over 5m, percentile over 24h | ||
| cpuQuery := fmt.Sprintf( | ||
| `quantile_over_time(%.2f, rate(container_cpu_usage_seconds_total{namespace="%s", pod=~"%s", container="%s"}[5m])[24h:%s])`, | ||
| p, workload.Namespace, workload.PodRegex, containerName, historicalStepInterval, | ||
| ) | ||
|
|
||
| cpuVal, err := c.queryScalar(ctx, cpuQuery, now) | ||
| if err != nil { | ||
| c.logger.V(1).Info("CPU percentile query failed", | ||
| "percentile", p, "error", err) | ||
| } else { | ||
| // Convert from cores (float) to millicores (int) | ||
| cpuValues[p] = int64(cpuVal * 1000) | ||
| } | ||
|
|
||
| // Memory query: direct percentile over 24h | ||
| memQuery := fmt.Sprintf( | ||
| `quantile_over_time(%.2f, container_memory_working_set_bytes{namespace="%s", pod=~"%s", container="%s"}[24h])`, | ||
| p, workload.Namespace, workload.PodRegex, containerName, | ||
| ) | ||
|
|
||
| memVal, err := c.queryScalar(ctx, memQuery, now) | ||
| if err != nil { | ||
| c.logger.V(1).Info("Memory percentile query failed", | ||
| "percentile", p, "error", err) | ||
| } else { | ||
| memValues[p] = int64(memVal) | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing p95 percentile causes proto fields to always be zero.
The percentiles array is [0.50, 0.75, 0.80, 0.90, 0.99] but the proto ContainerHistoricalMetrics includes CpuP95 and MemP95 fields. These fields will always be 0 since 0.95 is not queried.
🐛 Proposed fix
func (c *HistoricalMetricsCollector) fetchContainerPercentiles(ctx context.Context, workload HistoricalWorkloadQuery, containerName string, now time.Time) (*gen.ContainerHistoricalMetrics, int32, error) {
- percentiles := []float64{0.50, 0.75, 0.80, 0.90, 0.99}
+ percentiles := []float64{0.50, 0.75, 0.80, 0.90, 0.95, 0.99}And update the struct initialization:
return &gen.ContainerHistoricalMetrics{
ContainerName: containerName,
CpuP50: cpuValues[0.50],
CpuP75: cpuValues[0.75],
CpuP80: cpuValues[0.80],
CpuP90: cpuValues[0.90],
+ CpuP95: cpuValues[0.95],
CpuP99: cpuValues[0.99],
MemP50: memValues[0.50],
MemP75: memValues[0.75],
MemP80: memValues[0.80],
MemP90: memValues[0.90],
+ MemP95: memValues[0.95],
MemP99: memValues[0.99],
}, sampleCount, nil🤖 Prompt for AI Agents
In `@internal/collector/historical_metrics_collector.go` around lines 143 - 179,
The percentiles list currently omits 0.95 so CpuP95 and MemP95 in
ContainerHistoricalMetrics stay zero; update the percentiles slice (variable
percentiles) to include 0.95 (e.g., []float64{0.50, 0.75, 0.80, 0.90, 0.95,
0.99}) and ensure when you populate the proto from cpuValues and memValues you
read the 0.95 entries (cpuValues[0.95], memValues[0.95]) to set CpuP95 and
MemP95 respectively (verify code that builds the ContainerHistoricalMetrics
struct uses those keys).
| AddFunc: func(obj interface{}) { | ||
| wr := obj.(*unstructured.Unstructured) | ||
| c.handleWorkloadRecommendationEvent(wr, EventTypeAdd) | ||
| }, |
There was a problem hiding this comment.
AddFunc also has unchecked type assertion.
Similar to DeleteFunc, AddFunc should handle unexpected object types defensively to avoid panics.
🛡️ Proposed fix
AddFunc: func(obj interface{}) {
- wr := obj.(*unstructured.Unstructured)
+ wr, ok := obj.(*unstructured.Unstructured)
+ if !ok {
+ c.logger.Error(nil, "Unexpected object type in AddFunc", "type", fmt.Sprintf("%T", obj))
+ return
+ }
c.handleWorkloadRecommendationEvent(wr, EventTypeAdd)
},🤖 Prompt for AI Agents
In `@internal/collector/workload_recommendation_collector.go` around lines 104 -
107, AddFunc currently does an unchecked type assertion on obj to
*unstructured.Unstructured which can panic; update AddFunc to use the safe
assertion pattern (wr, ok := obj.(*unstructured.Unstructured)) and if ok is
false, log a warning or error and return early instead of calling
handleWorkloadRecommendationEvent. Mirror the defensive check used in DeleteFunc
so handleWorkloadRecommendationEvent is only called with a valid
*unstructured.Unstructured.
| DeleteFunc: func(obj interface{}) { | ||
| wr := obj.(*unstructured.Unstructured) | ||
| c.handleWorkloadRecommendationEvent(wr, EventTypeDelete) | ||
| }, |
There was a problem hiding this comment.
Unchecked type assertion may panic on tombstone objects.
The DeleteFunc performs an unchecked type assertion. When a watch is out of sync, Kubernetes may pass a DeletedFinalStateUnknown tombstone object instead of the actual resource, causing a panic.
🐛 Proposed fix to handle tombstone objects
DeleteFunc: func(obj interface{}) {
- wr := obj.(*unstructured.Unstructured)
+ var wr *unstructured.Unstructured
+ switch t := obj.(type) {
+ case *unstructured.Unstructured:
+ wr = t
+ case cache.DeletedFinalStateUnknown:
+ var ok bool
+ wr, ok = t.Obj.(*unstructured.Unstructured)
+ if !ok {
+ c.logger.Error(nil, "DeletedFinalStateUnknown contained non-Unstructured object", "type", fmt.Sprintf("%T", t.Obj))
+ return
+ }
+ default:
+ c.logger.Error(nil, "Unexpected object type in DeleteFunc", "type", fmt.Sprintf("%T", obj))
+ return
+ }
c.handleWorkloadRecommendationEvent(wr, EventTypeDelete)
},🤖 Prompt for AI Agents
In `@internal/collector/workload_recommendation_collector.go` around lines 127 -
130, The DeleteFunc uses an unchecked type assertion on obj to
*unstructured.Unstructured which can panic for tombstone objects; update the
DeleteFunc to detect and unwrap a cache.DeletedFinalStateUnknown (or similar
tombstone) before asserting, retrieving the underlying object when present, and
only then call handleWorkloadRecommendationEvent(wr, EventTypeDelete); ensure
you handle cases where the tombstone contains a non-Unstructured object by
logging/ignoring rather than panicking so that handleWorkloadRecommendationEvent
is only invoked with a valid *unstructured.Unstructured.
| // 2. Close the batchChan (input to the batcher). | ||
| if c.batchChan != nil { | ||
| close(c.batchChan) | ||
| c.batchChan = nil | ||
| c.logger.Info("Closed WorkloadRecommendation collector batch input channel") | ||
| } |
There was a problem hiding this comment.
Potential race condition when closing batchChan.
If handleWorkloadRecommendationEvent is still executing when Stop() is called, writing to the closed batchChan at line 197 will panic. Consider using a sync mechanism or checking if the channel is closed before writing.
🔒️ Proposed fix using atomic flag
Add a stopped flag and check it before sending:
type WorkloadRecommendationCollector struct {
client dynamic.Interface
informerFactory dynamicinformer.DynamicSharedInformerFactory
wrInformer cache.SharedIndexInformer
batchChan chan CollectedResource
resourceChan chan []CollectedResource
batcher *ResourcesBatcher
stopCh chan struct{}
namespaces []string
logger logr.Logger
telemetryLogger telemetry_logger.Logger
- mu sync.RWMutex
+ stopped atomic.Bool
}Then in handleWorkloadRecommendationEvent:
+ if c.stopped.Load() {
+ return
+ }
c.batchChan <- CollectedResource{And in Stop():
+ c.stopped.Store(true)
// 2. Close the batchChan (input to the batcher).🤖 Prompt for AI Agents
In `@internal/collector/workload_recommendation_collector.go` around lines 278 -
283, There is a race where Stop() closes c.batchChan while
handleWorkloadRecommendationEvent may still write to it; add a stop/closed
indicator (e.g., an atomic.Bool or mutex-protected bool) on the collector
struct, set it in Stop() before closing the channel, and have
handleWorkloadRecommendationEvent check that flag (return early if stopped)
before attempting to send to c.batchChan; ensure Stop() sets the flag, waits for
any goroutines to finish (or use a WaitGroup) then closes c.batchChan and nils
it to avoid races when writing to batchChan.
5f27b6c to
64763ad
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@internal/controller/collectionpolicy_controller.go`:
- Around line 332-367: The retryPendingCollectors loop currently removes a
collector from r.pendingCollectors when CollectionManager.StartCollector(ctx,
pc.name.String()) fails; change the logic so that on StartCollector error you
log the error and re-queue the collector by appending pc to stillPending
(similar to the RegisterCollector error branch) so it will be retried later;
update retryPendingCollectors to ensure any StartCollector failure keeps the
pendingCollector (using the pendingCollectors/sillPending variables and
referencing retryPendingCollectors, pendingCollectors, and
CollectionManager.StartCollector).
In `@internal/server/mpa_server.go`:
- Around line 64-98: StreamWorkloadMetrics spawns multiple goroutines that call
stream.Send() concurrently (the updates loop and the refresh-triggered
sendHistoricalForSubscriptions), which is unsafe; serialize all stream.Send()
calls by introducing a per-stream mutex locked around every stream.Send
invocation used in StreamWorkloadMetrics and in sendHistoricalForSubscriptions
when invoked for this client. Specifically, when registering the client via
subscriptionManager.Register and creating the updates channel, also create a
mutex (e.g., sendMu) scoped to that client and use it to wrap all stream.Send
calls in the updates goroutine and in the code path that calls
sendHistoricalForSubscriptions for this client so no two goroutines call
stream.Send simultaneously; ensure the mutex is cleaned up when Unregister is
called.
🧹 Nitpick comments (1)
internal/collector/container_resource_collector.go (1)
562-565: Prefer running container count for pod-level network attribution.
Line 565 useslen(pod.Spec.Containers), which can overcount when containers are not running, skewing any downstream per‑container normalization. Consider counting running containers and falling back to spec length if none are reported.♻️ Proposed tweak
- metricsSnapshot.NetworkMetricsArePodLevel = true - metricsSnapshot.PodContainerCount = len(pod.Spec.Containers) + metricsSnapshot.NetworkMetricsArePodLevel = true + runningCount := 0 + for i := range pod.Status.ContainerStatuses { + if pod.Status.ContainerStatuses[i].State.Running != nil { + runningCount++ + } + } + if runningCount == 0 { + runningCount = len(pod.Spec.Containers) + } + metricsSnapshot.PodContainerCount = runningCount
| // retryPendingCollectors checks if previously unavailable collectors (e.g., CRDs installed | ||
| // after zxporter startup) have become available and registers/starts them. | ||
| func (r *CollectionPolicyReconciler) retryPendingCollectors(ctx context.Context) { | ||
| r.pendingMu.Lock() | ||
| defer r.pendingMu.Unlock() | ||
|
|
||
| if len(r.pendingCollectors) == 0 { | ||
| return | ||
| } | ||
|
|
||
| logger := r.Log.WithName("retry-pending") | ||
| var stillPending []pendingCollector | ||
|
|
||
| for _, pc := range r.pendingCollectors { | ||
| newCollector := pc.factory() | ||
| if newCollector.IsAvailable(ctx) { | ||
| logger.Info("Previously unavailable collector is now available, registering", | ||
| "type", pc.name.String()) | ||
| if err := r.CollectionManager.RegisterCollector(newCollector); err != nil { | ||
| logger.Error(err, "Failed to register collector", "type", pc.name.String()) | ||
| stillPending = append(stillPending, pc) | ||
| continue | ||
| } | ||
| if err := r.CollectionManager.StartCollector(ctx, pc.name.String()); err != nil { | ||
| logger.Error(err, "Failed to start collector", "type", pc.name.String()) | ||
| continue | ||
| } | ||
| logger.Info("Successfully started previously unavailable collector", | ||
| "type", pc.name.String()) | ||
| } else { | ||
| stillPending = append(stillPending, pc) | ||
| } | ||
| } | ||
|
|
||
| r.pendingCollectors = stillPending | ||
| } |
There was a problem hiding this comment.
Retry should keep collectors pending when StartCollector fails.
Currently a StartCollector error drops the collector from the retry list, so it may never be retried.
🐛 Proposed fix (re-queue on start failure)
if err := r.CollectionManager.StartCollector(ctx, pc.name.String()); err != nil {
logger.Error(err, "Failed to start collector", "type", pc.name.String())
+ stillPending = append(stillPending, pc)
continue
}🤖 Prompt for AI Agents
In `@internal/controller/collectionpolicy_controller.go` around lines 332 - 367,
The retryPendingCollectors loop currently removes a collector from
r.pendingCollectors when CollectionManager.StartCollector(ctx, pc.name.String())
fails; change the logic so that on StartCollector error you log the error and
re-queue the collector by appending pc to stillPending (similar to the
RegisterCollector error branch) so it will be retried later; update
retryPendingCollectors to ensure any StartCollector failure keeps the
pendingCollector (using the pendingCollectors/sillPending variables and
referencing retryPendingCollectors, pendingCollectors, and
CollectionManager.StartCollector).
| func (s *MpaServer) StreamWorkloadMetrics(stream gen.MpaService_StreamWorkloadMetricsServer) error { | ||
| // Create a channel for this client's metric updates | ||
| updates := make(chan *gen.ContainerMetricsBatch, 100) | ||
| updates := make(chan *gen.MpaStreamResponse, 100) | ||
| clientID := s.subscriptionManager.Register(updates) | ||
| defer s.subscriptionManager.Unregister(clientID) | ||
|
|
||
| s.logger.Info("Client connected to metric stream", "clientID", clientID) | ||
|
|
||
| // Start a goroutine to send metrics to the client | ||
| go func() { | ||
| for batch := range updates { | ||
| if err := stream.Send(batch); err != nil { | ||
| for msg := range updates { | ||
| if err := stream.Send(msg); err != nil { | ||
| s.logger.Error(err, "Failed to send metrics to client", "clientID", clientID) | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| // Start periodic historical refresh | ||
| refreshTicker := time.NewTicker(15 * time.Minute) | ||
| defer refreshTicker.Stop() | ||
|
|
||
| go func() { | ||
| for { | ||
| select { | ||
| case <-stream.Context().Done(): | ||
| return | ||
| case <-refreshTicker.C: | ||
| interests := s.subscriptionManager.GetInterests(clientID) | ||
| if len(interests) > 0 { | ||
| s.sendHistoricalForSubscriptions(stream, interests) | ||
| } | ||
| } | ||
| } | ||
| }() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "mpa_server.go" -type fRepository: devzero-inc/zxporter
Length of output: 95
🏁 Script executed:
wc -l internal/server/mpa_server.go 2>/dev/null || echo "File not found in expected location"Repository: devzero-inc/zxporter
Length of output: 97
🏁 Script executed:
cat -n internal/server/mpa_server.go | sed -n '64,98p'Repository: devzero-inc/zxporter
Length of output: 1330
🏁 Script executed:
cat -n internal/server/mpa_server.go | sed -n '304,349p'Repository: devzero-inc/zxporter
Length of output: 1852
🏁 Script executed:
rg "sendHistoricalForSubscriptions" internal/server/mpa_server.go -A 3 -B 1Repository: devzero-inc/zxporter
Length of output: 515
🏁 Script executed:
rg "go s.sendHistoricalForSubscriptions|s.sendHistoricalForSubscriptions" internal/server/mpa_server.go -B 2 -A 1Repository: devzero-inc/zxporter
Length of output: 289
🏁 Script executed:
cat -n internal/server/mpa_server.go | sed -n '1,110p' | grep -E "import|sync|Mutex"Repository: devzero-inc/zxporter
Length of output: 94
🌐 Web query:
gRPC stream.Send() thread safety concurrent writes documentation
💡 Result:
-
gRPC-Go (
stream.Send()/SendMsg): A single stream does not support concurrent writes. It’s safe to have one goroutine sending while another goroutine receives, but it is not safe to callSendMsg(and therefore generatedstream.Send(...)) from multiple goroutines at the same time. Also,CloseSendmust not run concurrently withSendMsg. If multiple goroutines may send, serialize with a mutex or funnel sends through one goroutine. [1] [2] -
gRPC C++
- Sync streaming:
Read()is documented as thread-safe with respect toWrite()/WritesDone(), but concurrentWrite()calls on the same stream are not described as safe; treat writes as serialized. [3] - Async streaming: Only one write may be outstanding at a time; you must wait for the write completion (tag pulled from the completion queue) before issuing the next
Write(). This effectively forbids concurrent writes on the same stream. [4]
- Sync streaming:
Sources:
[1] gRPC-Go concurrency guide (streams)
[2] grpc-go ClientStream.SendMsg doc comment (stream.go)
[3] gRPC C++ ServerReaderWriter::Read() thread-safety note
[4] gRPC C++ AsyncWriterInterface::Write() “only one write outstanding” rule
Serialize stream.Send() to prevent undefined behavior from concurrent writes.
gRPC-Go does not support concurrent writes to the same stream. The code has two goroutines that call stream.Send() on the same stream object without synchronization:
- Goroutine at lines 73–80 calls
stream.Send()from the updates channel loop - Goroutine at lines 86–98 (and the secondary
go s.sendHistoricalForSubscriptions()call) callsstream.Send()viasendHistoricalForSubscriptions()at line 346
Add a per-stream mutex to serialize all stream.Send() calls. The proposed fix in the diff is appropriate.
🤖 Prompt for AI Agents
In `@internal/server/mpa_server.go` around lines 64 - 98, StreamWorkloadMetrics
spawns multiple goroutines that call stream.Send() concurrently (the updates
loop and the refresh-triggered sendHistoricalForSubscriptions), which is unsafe;
serialize all stream.Send() calls by introducing a per-stream mutex locked
around every stream.Send invocation used in StreamWorkloadMetrics and in
sendHistoricalForSubscriptions when invoked for this client. Specifically, when
registering the client via subscriptionManager.Register and creating the updates
channel, also create a mutex (e.g., sendMu) scoped to that client and use it to
wrap all stream.Send calls in the updates goroutine and in the code path that
calls sendHistoricalForSubscriptions for this client so no two goroutines call
stream.Send simultaneously; ensure the mutex is cleaned up when Unregister is
called.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@internal/server/mpa_server.go`:
- Around line 310-332: The current podRegex = interest.Name + "-.*" will miss
standalone Pods and other naming schemes; update the discovery logic around
s.historicalCollector.DiscoverContainers so it first handles interest.Kind ==
"Pod" by using an exact-name match (or call DiscoverContainers with the exact
pod name) and otherwise use a regex that matches both the exact name and the
suffixed form (e.g., make podRegex allow either interest.Name or
interest.Name-<suffix>), then pass that podRegex into
collector.HistoricalWorkloadQuery when appending; ensure DiscoverContainers is
invoked with the adjusted podRegex and retain existing logging on empty/failed
discoveries.
🧹 Nitpick comments (1)
internal/collector/manager.go (1)
397-406: Potential memory leak:time.Afterin a loop allocates timers that aren't GC'd until expiry.Each loop iteration creates a new timer via
time.After. When the send succeeds before timeout, that timer still lives for 5 seconds before being collected. Under high throughput this accumulates timers and increases memory pressure.Consider using
time.NewTimerwith explicit cleanup:♻️ Suggested fix using reusable timer
for resources := range resourceChan { // Skip if channel is closed if len(resources) == 0 { continue } // Update metrics for the ingested resources m.telemetryMetrics.MessagesIngested.WithLabelValues(resources[0].ResourceType.String()).Add(float64(len(resources))) // Forward to the combined channel with a timeout to reduce silent drops + timer := time.NewTimer(5 * time.Second) select { case m.combinedChannel <- resources: // Successfully sent - case <-time.After(5 * time.Second): + if !timer.Stop() { + <-timer.C + } + case <-timer.C: m.logger.Error(nil, "Combined channel buffer full, dropping resources after timeout", "count", len(resources), "type", resources[0].ResourceType.String()) m.telemetryMetrics.MessagesDropped.WithLabelValues(resources[0].ResourceType.String()).Add(float64(len(resources))) } }
| queries := make([]collector.HistoricalWorkloadQuery, 0, len(interests)) | ||
| for _, interest := range interests { | ||
| podRegex := interest.Name + "-.*" | ||
|
|
||
| // Discover containers from Prometheus | ||
| containers, err := s.historicalCollector.DiscoverContainers( | ||
| stream.Context(), interest.Namespace, podRegex) | ||
| if err != nil { | ||
| s.logger.Error(err, "Failed to discover containers", "workload", interest.Name) | ||
| continue | ||
| } | ||
| if len(containers) == 0 { | ||
| s.logger.V(1).Info("No containers discovered for workload", "workload", interest.Name) | ||
| continue | ||
| } | ||
|
|
||
| queries = append(queries, collector.HistoricalWorkloadQuery{ | ||
| Namespace: interest.Namespace, | ||
| WorkloadName: interest.Name, | ||
| WorkloadKind: interest.Kind, | ||
| PodRegex: podRegex, | ||
| Containers: containers, | ||
| }) |
There was a problem hiding this comment.
Pod regex pattern may not match all workload types.
The pattern interest.Name + "-.*" assumes pods are named {workload-name}-{suffix}. This works for Deployments, ReplicaSets, and StatefulSets, but won't match:
- Standalone Pods (where
Kind == "Pod"and the pod name equals the workload name exactly) - CronJob-spawned pods with different naming conventions
Consider handling standalone pods explicitly:
💡 Suggested improvement
for _, interest := range interests {
- podRegex := interest.Name + "-.*"
+ var podRegex string
+ if interest.Kind == "Pod" {
+ podRegex = "^" + regexp.QuoteMeta(interest.Name) + "$"
+ } else {
+ podRegex = interest.Name + "-.*"
+ }🤖 Prompt for AI Agents
In `@internal/server/mpa_server.go` around lines 310 - 332, The current podRegex =
interest.Name + "-.*" will miss standalone Pods and other naming schemes; update
the discovery logic around s.historicalCollector.DiscoverContainers so it first
handles interest.Kind == "Pod" by using an exact-name match (or call
DiscoverContainers with the exact pod name) and otherwise use a regex that
matches both the exact name and the suffixed form (e.g., make podRegex allow
either interest.Name or interest.Name-<suffix>), then pass that podRegex into
collector.HistoricalWorkloadQuery when appending; ensure DiscoverContainers is
invoked with the adjusted podRegex and retain existing logging on empty/failed
discoveries.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@internal/collector/workload_recommendation_collector.go`:
- Around line 312-321: The comment above the generic error branch for the
WorkloadRecommendation availability check is out of sync with behavior: it says
"assume resource might exist" but the code logs the error and returns false;
update the comment to reflect that on other errors (permissions, etc.) we log
the error and treat the resource as unavailable by returning false (or
alternatively change the return to true if you intend to assume availability).
Locate the block using isResourceTypeUnavailableError(err) and the subsequent
c.logger.Error(...) / return false and either (a) replace the misleading comment
with one that says we log the error and return false (treat as unavailable), or
(b) change the return false to return true if you want to preserve the original
comment intent.
In `@internal/controller/collectionpolicy_controller.go`:
- Around line 2976-2984: The pendingCollectors slice is appended to on each
rebuild so stale/duplicate pendingCollector entries (which capture old
c.factory/config) accumulate; modify the logic around pendingMu,
pendingCollectors and the block that appends pendingCollector (references:
pendingCollectors, pendingMu, pendingCollector type, c.factory, c.name) to first
acquire the lock, either clear/reinitialize r.pendingCollectors (e.g.,
r.pendingCollectors = nil or build a new slice) or de-duplicate existing entries
for the same c.name before appending, then append only the current
pendingCollector and release the lock so old factories are not retained.
| // For "no matching resource" or other similar errors | ||
| if isResourceTypeUnavailableError(err) { | ||
| c.logger.Info("WorkloadRecommendation CRD not installed in cluster", | ||
| "error", err.Error()) | ||
| return false | ||
| } | ||
|
|
||
| // For other errors (permissions, etc.), log but assume resource might exist | ||
| c.logger.Error(err, "Error checking WorkloadRecommendation resource availability") | ||
| return false |
There was a problem hiding this comment.
Align IsAvailable fallback comment with behavior.
The comment says “assume resource might exist,” but the function returns false. Either update the comment or return true to match intent.
✏️ Comment-only fix
-// For other errors (permissions, etc.), log but assume resource might exist
+// For other errors (permissions, etc.), log and treat as unavailable
c.logger.Error(err, "Error checking WorkloadRecommendation resource availability")
return false📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // For "no matching resource" or other similar errors | |
| if isResourceTypeUnavailableError(err) { | |
| c.logger.Info("WorkloadRecommendation CRD not installed in cluster", | |
| "error", err.Error()) | |
| return false | |
| } | |
| // For other errors (permissions, etc.), log but assume resource might exist | |
| c.logger.Error(err, "Error checking WorkloadRecommendation resource availability") | |
| return false | |
| // For "no matching resource" or other similar errors | |
| if isResourceTypeUnavailableError(err) { | |
| c.logger.Info("WorkloadRecommendation CRD not installed in cluster", | |
| "error", err.Error()) | |
| return false | |
| } | |
| // For other errors (permissions, etc.), log and treat as unavailable | |
| c.logger.Error(err, "Error checking WorkloadRecommendation resource availability") | |
| return false |
🤖 Prompt for AI Agents
In `@internal/collector/workload_recommendation_collector.go` around lines 312 -
321, The comment above the generic error branch for the WorkloadRecommendation
availability check is out of sync with behavior: it says "assume resource might
exist" but the code logs the error and returns false; update the comment to
reflect that on other errors (permissions, etc.) we log the error and treat the
resource as unavailable by returning false (or alternatively change the return
to true if you intend to assume availability). Locate the block using
isResourceTypeUnavailableError(err) and the subsequent c.logger.Error(...) /
return false and either (a) replace the misleading comment with one that says we
log the error and return false (treat as unavailable), or (b) change the return
false to return true if you want to preserve the original comment intent.
| // Track CRD-based collectors for deferred retry when their CRDs become available | ||
| if c.factory != nil { | ||
| r.pendingMu.Lock() | ||
| r.pendingCollectors = append(r.pendingCollectors, pendingCollector{ | ||
| name: c.name, | ||
| factory: c.factory, | ||
| }) | ||
| r.pendingMu.Unlock() | ||
| logger.Info("Added collector to pending retry list", "type", c.name.String()) |
There was a problem hiding this comment.
Prevent stale/duplicate pending collectors after config changes.
Line 2976 appends without clearing or de-duping, so old factories (capturing prior config) can linger and register with stale settings.
🧹 Proposed fix (clear pending list before rebuilding)
func (r *CollectionPolicyReconciler) registerResourceCollectors(
logger logr.Logger,
config *PolicyConfig,
metricsClient *metricsv1.Clientset,
) error {
// Use the shared Prometheus metrics instance from the reconciler
disabledCollectorsMap := make(map[string]bool)
for _, collectorType := range config.DisabledCollectors {
disabledCollectorsMap[collectorType] = true
}
+
+ // Reset pending list so we don't keep stale factories across restarts/config changes.
+ r.pendingMu.Lock()
+ r.pendingCollectors = nil
+ r.pendingMu.Unlock()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Track CRD-based collectors for deferred retry when their CRDs become available | |
| if c.factory != nil { | |
| r.pendingMu.Lock() | |
| r.pendingCollectors = append(r.pendingCollectors, pendingCollector{ | |
| name: c.name, | |
| factory: c.factory, | |
| }) | |
| r.pendingMu.Unlock() | |
| logger.Info("Added collector to pending retry list", "type", c.name.String()) | |
| func (r *CollectionPolicyReconciler) registerResourceCollectors( | |
| logger logr.Logger, | |
| config *PolicyConfig, | |
| metricsClient *metricsv1.Clientset, | |
| ) error { | |
| // Use the shared Prometheus metrics instance from the reconciler | |
| disabledCollectorsMap := make(map[string]bool) | |
| for _, collectorType := range config.DisabledCollectors { | |
| disabledCollectorsMap[collectorType] = true | |
| } | |
| // Reset pending list so we don't keep stale factories across restarts/config changes. | |
| r.pendingMu.Lock() | |
| r.pendingCollectors = nil | |
| r.pendingMu.Unlock() |
🤖 Prompt for AI Agents
In `@internal/controller/collectionpolicy_controller.go` around lines 2976 - 2984,
The pendingCollectors slice is appended to on each rebuild so stale/duplicate
pendingCollector entries (which capture old c.factory/config) accumulate; modify
the logic around pendingMu, pendingCollectors and the block that appends
pendingCollector (references: pendingCollectors, pendingMu, pendingCollector
type, c.factory, c.name) to first acquire the lock, either clear/reinitialize
r.pendingCollectors (e.g., r.pendingCollectors = nil or build a new slice) or
de-duplicate existing entries for the same c.name before appending, then append
only the current pendingCollector and release the lock so old factories are not
retained.
* CRD, rules and config for incluster MPA * Update mpa dakr evaluator * Update mpa rule and persistance * Update mpa logic and crds * Update memory check * Clean up * Remove feedback * Update rec generation, clean up crds, try out new rec gen * Remove local chart * Fix rbac permissions * Update lint issues * Fix lint issues
Summary
recommendations (Applied/Failed/Skipped) back to the control plane via the existing batching pipeline
bootstrap Dakr subscriptions with historical context
summaries on a single bidirectional stream, with 15-minute periodic refresh
calculations
instead of being silently skipped
Test plan
Summary by CodeRabbit
New Features
Improvements
Tests