Skip to content

volcano job collector#207

Merged
Parthiba-Hazra merged 2 commits into
mainfrom
ph/volcano-job-collector
Dec 16, 2025
Merged

volcano job collector#207
Parthiba-Hazra merged 2 commits into
mainfrom
ph/volcano-job-collector

Conversation

@Parthiba-Hazra
Copy link
Copy Markdown
Collaborator

@Parthiba-Hazra Parthiba-Hazra commented Dec 9, 2025

Summary by CodeRabbit

  • New Features
    • Full support for discovering, collecting, and monitoring Volcano Job resources cluster-wide.
    • Config option to selectively exclude specific Volcano Jobs or namespaces from collection.
    • Added lifecycle integration so Volcano Job collection can be enabled/disabled and restarted dynamically.
    • Updated cluster permissions to allow safe access to Volcano Job resources.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Dec 9, 2025

Walkthrough

Adds support for collecting Volcano Job resources: new VolcanoJobCollector with dynamic informer watching for batch.volcano.sh jobs, ResourceType enum extended, RBAC rule added, and PolicyConfig updated to support excluded volcano jobs and registration in controller flows.

Changes

Cohort / File(s) Summary
RBAC Configuration
config/rbac/role.yaml
Adds ClusterRole rule for batch.volcano.sh jobs with verbs get, list, watch.
Resource Type & Lists
internal/collector/interface.go, internal/collector/types.go
Adds VolcanoJob to ResourceType enum; updates String() to "volcano_job", ProtoType() mapping to RESOURCE_TYPE_VOLCANO_JOB, and includes it in AllResourceTypes().
Volcano Job Collector Implementation
internal/collector/volcano_job_collector.go
New VolcanoJobCollector type: dynamic informer factory setup, Add/Update/Delete handlers, exclusion filtering, resource batching, availability probe, Start/Stop, AddResource, and telemetry hooks.
Controller Integration
internal/controller/collectionpolicy_controller.go
Adds ExcludedVolcanoJobs []collector.ExcludedVolcanoJob to PolicyConfig; wires VolcanoJob collector into registration, selective restart, disabled-collector handling, and RBAC annotations.

Sequence Diagram

sequenceDiagram
    participant DynClient as DynamicClient
    participant Inf as SharedInformer
    participant Col as VolcanoJobCollector
    participant Batch as ResourcesBatcher
    participant Ch as ResourceChannel

    rect rgb(200,240,200)
    Note over DynClient,Col: Start
    Col->>DynClient: create dynamic informer factory
    DynClient->>Inf: build informer for batch.volcano.sh/jobs
    Inf->>Col: attach Add/Update/Delete handlers
    Col->>Inf: start informer & wait for cache sync
    end

    rect rgb(240,220,200)
    Note over Inf,Batch: Event handling
    Inf->>Col: event (Add/Update/Delete)
    Col->>Col: type assert & isExcluded?
    alt excluded
        Col->>Col: drop event
    else not excluded
        Col->>Col: processJob -> build payload
        Col->>Batch: queue CollectedResource
    end
    end

    rect rgb(220,240,240)
    Note over Batch,Ch: Batching output
    Batch->>Batch: accumulate & flush
    Batch->>Ch: emit batched resources
    end

    rect rgb(240,200,240)
    Note over Col,Inf: Stop
    Col->>Inf: stop informer
    Col->>Batch: stop & drain
    Col->>Ch: close channel
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Review focus:
    • internal/collector/volcano_job_collector.go — informer lifecycle, exclusion logic, batching, telemetry, and error handling.
    • internal/controller/collectionpolicy_controller.go — integration points for registration, restart flows, and config changes.
    • internal/collector/interface.go / types.go — enum/proto mapping consistency.

Suggested reviewers

  • dray92

Poem

🐰 A Volcano job hops into view tonight,
Informers listen, batching just right,
Exclusions checked with a careful glance,
Metrics hum and events advance,
Hooray — collection's new little sprite! 🌋

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'volcano job collector' directly describes the main change: introduction of a new VolcanoJob collector implementation with supporting infrastructure across multiple files.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ph/volcano-job-collector

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/controller/collectionpolicy_controller.go (1)

79-138: ExcludedVolcanoJobs is never populated from the CRD

ExcludedVolcanoJobs []collector.ExcludedVolcanoJob exists in PolicyConfig and is checked in identifyAffectedCollectors and passed to collector constructors, but it has no corresponding field in the Exclusions struct (api/v1/collectionpolicy_types.go) and is never populated in createNewConfig. Any Volcano job exclusions configured in the CRD will be silently ignored.

Add ExcludedVolcanoJobs []ExcludedVolcanoJob to the Exclusions struct and populate it in createNewConfig similar to other excluded resources:

+    // VolcanoJobs
+    for _, job := range envSpec.Exclusions.ExcludedVolcanoJobs {
+        newConfig.ExcludedVolcanoJobs = append(newConfig.ExcludedVolcanoJobs, collector.ExcludedVolcanoJob{
+            Namespace: job.Namespace,
+            Name:      job.Name,
+        })
+    }

(Note: The same issue affects ExcludedDatadogReplicaSets, ExcludedArgoRollouts, and ExcludedKubeflowNotebooks—all are missing from the CRD Exclusions struct and should be added as well.)

🧹 Nitpick comments (1)
internal/collector/volcano_job_collector.go (1)

89-118: VolcanoJob collector behavior and wiring look solid overall

  • Uses the correct GVR {Group: "batch.volcano.sh", Version: "v1alpha1", Resource: "jobs"} and dynamic informer.
  • Namespace handling (single‑namespace informer vs. all‑namespace informer plus isExcluded namespace filter) is consistent and safe, albeit slightly over‑broad in terms of watch scope for multi‑namespace configs.
  • ExcludedVolcanoJob mapping to a map[types.NamespacedName]bool provides efficient exclusion checks.
  • IsAvailable probes the resource with a bounded List and reports telemetry on failure.
  • AddResource validates type and reuses handleJobEvent, matching other collectors’ patterns.

Apart from the Stop/batchChan race, the implementation fits well into the existing collector framework.

If you want to tighten scope for multi‑namespace setups, you could eventually create per‑namespace informers (one per entry in namespaces) instead of relying on a cluster‑wide informer plus isExcluded.

Also applies to: 215-235, 288-313, 364-387, 389-409

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 43ae9d7 and ba09baa.

⛔ Files ignored due to path filters (10)
  • dist/backend-install.yaml is excluded by !**/dist/**
  • dist/install.yaml is excluded by !**/dist/**
  • dist/installer_updater.yaml is excluded by !**/dist/**
  • dist/zxporter.yaml is excluded by !**/dist/**
  • gen/api/v1/apiv1connect/k8s.connect.go is excluded by !**/gen/**
  • gen/api/v1/common.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • gen/api/v1/k8s.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • gen/api/v1/k8s_grpc.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • gen/api/v1/metrics_collector.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • proto/dakr_proto_descriptor.bin is excluded by !**/*.bin
📒 Files selected for processing (6)
  • config/rbac/role.yaml (1 hunks)
  • internal/collector/interface.go (3 hunks)
  • internal/collector/types.go (1 hunks)
  • internal/collector/volcano_job_collector.go (1 hunks)
  • internal/controller/collectionpolicy_controller.go (6 hunks)
  • proto/metrics_collector.proto (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
internal/collector/interface.go (1)
gen/api/v1/metrics_collector.pb.go (1)
  • ResourceType_RESOURCE_TYPE_VOLCANO_JOB (174-174)
internal/controller/collectionpolicy_controller.go (3)
internal/collector/volcano_job_collector.go (2)
  • ExcludedVolcanoJob (39-42)
  • NewVolcanoJobCollector (45-87)
internal/collector/batcher.go (2)
  • DefaultMaxBatchSize (16-16)
  • DefaultMaxBatchTime (19-19)
internal/collector/interface.go (1)
  • VolcanoJob (142-142)
internal/collector/volcano_job_collector.go (4)
internal/collector/interface.go (6)
  • CollectedResource (309-325)
  • EventTypeAdd (18-18)
  • EventTypeUpdate (20-20)
  • EventTypeDelete (22-22)
  • EventType (12-12)
  • ResourceType (87-87)
internal/collector/batcher.go (2)
  • ResourcesBatcher (23-31)
  • NewResourcesBatcher (34-55)
internal/logger/logger.go (1)
  • Logger (26-29)
gen/api/v1/metrics_collector.pb.go (10)
  • LogLevel_LOG_LEVEL_ERROR (326-326)
  • EventType (27-27)
  • EventType (80-82)
  • EventType (84-86)
  • EventType (93-95)
  • ResourceType (98-98)
  • ResourceType (301-303)
  • ResourceType (305-307)
  • ResourceType (314-316)
  • LogLevel_LOG_LEVEL_WARN (325-325)
internal/collector/types.go (1)
internal/collector/interface.go (3)
  • VolumeAttachment (140-140)
  • KubeflowNotebook (141-141)
  • VolcanoJob (142-142)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
  • GitHub Check: Test on K8s v1.32.3 (manifest)
  • GitHub Check: Test on K8s v1.30.8 (manifest)
  • GitHub Check: Test on K8s v1.32.3 (helm)
  • GitHub Check: Test on K8s v1.28.15 (manifest)
  • GitHub Check: Test on K8s v1.30.8 (helm)
  • GitHub Check: Test on K8s v1.29.14 (manifest)
  • GitHub Check: Test on K8s v1.31.6 (helm)
  • GitHub Check: Test on K8s v1.29.14 (helm)
  • GitHub Check: Test on K8s v1.31.6 (manifest)
  • GitHub Check: Test on K8s v1.27.16 (helm)
  • GitHub Check: Test on K8s v1.25.16 (helm)
  • GitHub Check: Test on K8s v1.26.15 (helm)
  • GitHub Check: Test on K8s v1.28.15 (helm)
  • GitHub Check: Test on K8s v1.27.16 (manifest)
  • GitHub Check: Test on K8s v1.26.15 (manifest)
  • GitHub Check: Test on K8s v1.25.16 (manifest)
  • GitHub Check: Test Metrics Server Lifecycle on K8s v1.32.3
  • GitHub Check: Analyze (go)
🔇 Additional comments (6)
proto/metrics_collector.proto (1)

125-129: VolcanoJob proto enum addition looks consistent

RESOURCE_TYPE_VOLCANO_JOB = 50 cleanly extends ResourceType without renumbering existing values; comment placement and spacing match existing style. No issues here.

internal/collector/types.go (1)

5-16: AllResourceTypes correctly extended with VolcanoJob

Including VolcanoJob alongside KubeflowNotebook and VolumeAttachment keeps resource enumeration in sync with the new enum value and string mappings. No further changes needed here.

config/rbac/role.yaml (1)

126-133: RBAC for Volcano jobs is appropriate

Granting get, list, and watch on batch.volcano.sh jobs matches the collector’s read‑only usage and is consistent with existing batch job permissions.

internal/collector/interface.go (1)

90-143: VolcanoJob ResourceType wiring is consistent end‑to‑end

The new VolcanoJob constant, its "volcano_job" string, and ProtoType() mapping to RESOURCE_TYPE_VOLCANO_JOB are all aligned with the generated proto enum. This keeps resource type resolution and disabled‑collector logic working as expected.

If you haven’t already, please re‑generate gen/api/v1 to ensure the Go stubs for ResourceType_RESOURCE_TYPE_VOLCANO_JOB stay in sync with the proto.

Also applies to: 147-196, 205-303

internal/controller/collectionpolicy_controller.go (2)

196-199: Controller RBAC annotations for Volcano jobs align with manifest

The kubebuilder RBAC line for groups=batch.volcano.sh,resources=jobs,verbs=get;list;watch matches the ClusterRole rule in config/rbac/role.yaml, so generated manifests should stay consistent with the static YAML.

After running controller‑gen, please verify the rendered ClusterRole includes the Volcano jobs rule once as expected (no duplication with the hand‑written YAML).

Also applies to: 247-247


776-778: VolcanoJob collector is correctly integrated into restart, registration, and disabled‑collector flows

  • identifyAffectedCollectors keys off ExcludedVolcanoJobs and marks "volcano_job" for selective restarts.
  • restartCollectors and handleDisabledCollectorsChange both recreate collectors via collector.NewVolcanoJobCollector with TargetNamespaces and ExcludedVolcanoJobs.
  • registerResourceCollectors registers a VolcanoJobCollector with name: collector.VolcanoJob, so disabled‑collector config based on "volcano_job" will work as with other types.

This wiring is consistent with the existing collector patterns.

Also applies to: 1455-1464, 2567-2578, 3197-3206

Comment on lines +22 to +36
// VolcanoJobCollector watches for Volcano Job resources
type VolcanoJobCollector struct {
dynamicClient dynamic.Interface
batchChan chan CollectedResource
resourceChan chan []CollectedResource
batcher *ResourcesBatcher
stopCh chan struct{}
informers map[string]cache.SharedIndexInformer
informerStopChs map[string]chan struct{}
namespaces []string
excludedJobs map[types.NamespacedName]bool
logger logr.Logger
telemetryLogger telemetry_logger.Logger
mu sync.RWMutex
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, get the file size to understand what we're working with
wc -l internal/collector/volcano_job_collector.go

Repository: devzero-inc/zxporter

Length of output: 111


🏁 Script executed:

# Read the key sections mentioned in the review
sed -n '22,36p' internal/collector/volcano_job_collector.go
echo "---"
sed -n '63,87p' internal/collector/volcano_job_collector.go
echo "---"
sed -n '250,273p' internal/collector/volcano_job_collector.go
echo "---"
sed -n '315,352p' internal/collector/volcano_job_collector.go

Repository: devzero-inc/zxporter

Length of output: 3087


🏁 Script executed:

# Find other collector files to compare patterns
fd -t f "collector\.go$" internal/collector/ | head -20

Repository: devzero-inc/zxporter

Length of output: 927


🏁 Script executed:

# Find the actual location of handleJobEvent definition
rg -n "func.*handleJobEvent" internal/collector/volcano_job_collector.go

Repository: devzero-inc/zxporter

Length of output: 167


🏁 Script executed:

# Read lines 315-352 to see what else is mentioned
sed -n '315,352p' internal/collector/volcano_job_collector.go

Repository: devzero-inc/zxporter

Length of output: 1122


🏁 Script executed:

# Compare with job_collector.go - check Stop() implementation
rg -A 30 "func.*\*JobCollector\).*Stop" internal/collector/job_collector.go | head -50

Repository: devzero-inc/zxporter

Length of output: 920


🏁 Script executed:

# Compare with job_collector.go - check handleJobEvent
rg -A 20 "func.*\*JobCollector\).*handleJobEvent" internal/collector/job_collector.go | head -40

Repository: devzero-inc/zxporter

Length of output: 678


🏁 Script executed:

# Check the Start() method in volcano_job_collector to see how informers are managed
rg -A 60 "func.*\*VolcanoJobCollector\).*Start" internal/collector/volcano_job_collector.go | head -80

Repository: devzero-inc/zxporter

Length of output: 1851


🏁 Script executed:

# Check if there's any WaitGroup or synchronization for informer goroutines
rg -n "WaitGroup|sync\.Wait|informer\.Run" internal/collector/volcano_job_collector.go

Repository: devzero-inc/zxporter

Length of output: 111


🏁 Script executed:

# Check the batcher.stop() implementation to see if it waits for pending sends
fd -t f "batcher.go" internal/collector/ -o internal/

Repository: devzero-inc/zxporter

Length of output: 201


🏁 Script executed:

# Find and read batcher.stop() implementation
rg -A 20 "func.*\*ResourcesBatcher\).*stop" internal/collector/

Repository: devzero-inc/zxporter

Length of output: 971


🏁 Script executed:

# Check job_collector.go Start() to see if it has same pattern
rg -A 30 "func.*\*JobCollector\).*Start" internal/collector/job_collector.go | head -50

Repository: devzero-inc/zxporter

Length of output: 1111


🏁 Script executed:

# Check how many lines in volcano_job_collector.go around line 213 to understand informer lifecycle
sed -n '200,230p' internal/collector/volcano_job_collector.go

Repository: devzero-inc/zxporter

Length of output: 862


🏁 Script executed:

# Verify if handleJobEvent has any stopCh checks in volcano_job_collector
sed -n '251,280p' internal/collector/volcano_job_collector.go

Repository: devzero-inc/zxporter

Length of output: 1064


🏁 Script executed:

# Double-check if job_collector.go also has similar race by looking at its Stop() more carefully
rg -B 5 -A 40 "func.*\*JobCollector\).*Stop" internal/collector/job_collector.go | tail -50

Repository: devzero-inc/zxporter

Length of output: 1283


🏁 Script executed:

# Check if job_collector uses informerFactory.Start() and how it's stopped
rg -n "informerFactory.Start|informerFactory.Stop" internal/collector/job_collector.go

Repository: devzero-inc/zxporter

Length of output: 102


Critical race condition: closing batchChan while event handlers may still be executing

handleJobEvent unconditionally sends to batchChan with no synchronization check. However, Stop() closes this channel immediately after signaling informer shutdown, without waiting for in-flight handler goroutines to complete. Because informer.Run() executes asynchronously in a background goroutine (line 213), a handler invocation can race with batchChan being closed, causing a panic on send to closed channel.

The current shutdown sequence in Stop():

  1. Close informerStopChs (signals informer to stop, but doesn't wait)
  2. Close stopCh
  3. Close batchChan ← handlers may still be executing here
  4. Stop batcher

Fix by adding a stopCh check in handleJobEvent:

func (c *VolcanoJobCollector) handleJobEvent(obj *unstructured.Unstructured, eventType EventType) {
	name := obj.GetName()
	namespace := obj.GetNamespace()

	if c.isExcluded(namespace, name) {
		return
	}

	processedObj := c.processJob(obj)
	key := fmt.Sprintf("%s/%s", namespace, name)

	c.logger.Info("Collected Volcano Job resource", "key", key, "eventType", eventType, "resource", processedObj)
+	select {
+	case <-c.stopCh:
+		return
+	case c.batchChan <- CollectedResource{
-	c.batchChan <- CollectedResource{
		ResourceType: VolcanoJob,
		Object:       processedObj,
		Timestamp:    time.Now(),
		EventType:    eventType,
		Key:          key,
+	}
-	}
}

Avoid closing batchChan in Stop() (let the batcher manage its input channel lifecycle) or close it only after ensuring no more handlers can run. This same pattern should be reviewed and fixed consistently across other collectors (e.g., job_collector.go has the same vulnerability).

Also applies to: lines 63–87, 250–273, 315–352

🤖 Prompt for AI Agents
In internal/collector/volcano_job_collector.go around lines 22–36 and the
referenced areas (lines 63–87, 250–273, 315–352), handlers unconditionally send
to batchChan while Stop() closes batchChan, causing a race and possible panic;
modify handleJobEvent to check stopCh (or otherwise ensure collector is
stopping) before sending to batchChan to avoid sends after shutdown, and
remove/avoid closing batchChan inside Stop() — let the batcher own/close that
channel or only close it after confirming no informer handlers can run (e.g.,
wait for informer goroutines to exit or use a sync.WaitGroup); apply the same
pattern/fix to the other collector files (job_collector.go and similar)
mentioned in the comment.

@sandipanpanda sandipanpanda force-pushed the ph/volcano-job-collector branch from ba09baa to 79b3dfe Compare December 11, 2025 06:23
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/controller/collectionpolicy_controller.go (1)

77-138: ExcludedVolcanoJobs is never populated in createNewConfig

You added ExcludedVolcanoJobs []collector.ExcludedVolcanoJob to PolicyConfig and you wire it into:

  • identifyAffectedCollectors (lines 780–782),
  • selective restart / registration / disabled-collector re-enable flows,

but createNewConfig never populates newConfig.ExcludedVolcanoJobs from envSpec.Exclusions. That means any Volcano job exclusions defined in the CRD/env spec will be silently ignored, and exclusion changes will never be detected.

Consider adding a conversion block similar to the other Excluded* types, for example (assuming the spec has envSpec.Exclusions.ExcludedVolcanoJobs with Namespace and Name fields):

 // CronJobs
 for _, cron := range envSpec.Exclusions.ExcludedCronJobs {
   newConfig.ExcludedCronJobs = append(newConfig.ExcludedCronJobs, collector.ExcludedCronJob{
     Namespace: cron.Namespace,
     Name:      cron.Name,
   })
 }

+// VolcanoJobs
+for _, vj := range envSpec.Exclusions.ExcludedVolcanoJobs {
+  newConfig.ExcludedVolcanoJobs = append(newConfig.ExcludedVolcanoJobs, collector.ExcludedVolcanoJob{
+    Namespace: vj.Namespace,
+    Name:      vj.Name,
+  })
+}

Also applies to: 378-599

♻️ Duplicate comments (1)
internal/collector/volcano_job_collector.go (1)

250-273: Race: handleJobEvent can send on batchChan while Stop closes it, causing panics

handleJobEvent unconditionally does:

c.batchChan <- CollectedResource{...}

while Stop closes batchChan:

if c.batchChan != nil {
    close(c.batchChan)
    c.batchChan = nil
}

Because informers run in background goroutines, there is a real window where:

  • Stop has closed batchChan, but
  • an in-flight Add/Update/Delete handler is still executing handleJobEvent,

leading to send on closed channel panic. This is the same issue that was already raised on an earlier revision of this file and in job_collector.go.

A robust fix is to:

  1. Track active handler executions with a sync.WaitGroup.
  2. Wait for all handlers to finish before closing batchChan.
  3. Make Stop idempotent with sync.Once to avoid double-closing channels.

Example patch sketch:

 type VolcanoJobCollector struct {
     dynamicClient   dynamic.Interface
     batchChan       chan CollectedResource
@@
     logger          logr.Logger
     telemetryLogger telemetry_logger.Logger
-    mu              sync.RWMutex
+    mu              sync.RWMutex
+    handlersWg      sync.WaitGroup
+    stopOnce        sync.Once
 }
@@
 func (c *VolcanoJobCollector) Start(ctx context.Context) error {
@@
-    _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
-        AddFunc: func(obj interface{}) {
+    _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+        AddFunc: func(obj interface{}) {
+            c.handlersWg.Add(1)
+            defer c.handlersWg.Done()
@@
-        UpdateFunc: func(oldObj, newObj interface{}) {
+        UpdateFunc: func(oldObj, newObj interface{}) {
+            c.handlersWg.Add(1)
+            defer c.handlersWg.Done()
@@
-        DeleteFunc: func(obj interface{}) {
+        DeleteFunc: func(obj interface{}) {
+            c.handlersWg.Add(1)
+            defer c.handlersWg.Done()
@@
 func (c *VolcanoJobCollector) Stop() error {
-    c.logger.Info("Stopping Volcano Job collector")
-
-    // Stop all informers
-    for key, stopCh := range c.informerStopChs {
-        c.logger.Info("Stopping informer", "resource", key)
-        close(stopCh)
-    }
-
-    c.informers = make(map[string]cache.SharedIndexInformer)
-    c.informerStopChs = make(map[string]chan struct{})
-
-    // Close the main stop channel (signals informers to stop)
-    select {
-    case <-c.stopCh:
-        c.logger.Info("Volcano Job collector stop channel already closed")
-    default:
-        close(c.stopCh)
-        c.logger.Info("Closed Volcano Job collector stop channel")
-    }
-
-    // Close the batchChan (input to the batcher).
-    if c.batchChan != nil {
-        close(c.batchChan)
-        c.batchChan = nil
-        c.logger.Info("Closed Volcano Job collector batch input channel")
-    }
-
-    // Stop the batcher (waits for completion).
-    if c.batcher != nil {
-        c.batcher.stop()
-        c.logger.Info("Volcano Job collector batcher stopped")
-    }
-    // resourceChan is closed by the batcher's defer func.
-
-    return nil
+    c.stopOnce.Do(func() {
+        c.logger.Info("Stopping Volcano Job collector")
+
+        // Signal informers to stop.
+        for key, stopCh := range c.informerStopChs {
+            c.logger.Info("Stopping informer", "resource", key)
+            close(stopCh)
+        }
+        c.informers = make(map[string]cache.SharedIndexInformer)
+        c.informerStopChs = make(map[string]chan struct{})
+
+        // Wait for all in-flight event handlers to finish before touching batchChan.
+        c.handlersWg.Wait()
+        c.logger.Info("All Volcano Job event handlers completed")
+
+        // Close the main stop channel (used by the Start() helper goroutine).
+        select {
+        case <-c.stopCh:
+            c.logger.Info("Volcano Job collector stop channel already closed")
+        default:
+            close(c.stopCh)
+            c.logger.Info("Closed Volcano Job collector stop channel")
+        }
+
+        // Now it is safe to close the batchChan and stop the batcher.
+        if c.batchChan != nil {
+            close(c.batchChan)
+            c.batchChan = nil
+            c.logger.Info("Closed Volcano Job collector batch input channel")
+        }
+
+        if c.batcher != nil {
+            c.batcher.stop()
+            c.logger.Info("Volcano Job collector batcher stopped")
+        }
+        // resourceChan is closed by the batcher.
+    })
+    return nil
 }

This keeps the overall lifecycle the same but ensures no goroutine can be sending into batchChan at the moment it is closed, and prevents double-close panics if Stop is called more than once. You may also want to apply the same pattern (or similar) to job_collector.go to eliminate the same class of race there.

Also applies to: 315-352

🧹 Nitpick comments (1)
internal/collector/volcano_job_collector.go (1)

38-87: Constructor and exclusion map look good

Building a map[types.NamespacedName]bool from []ExcludedVolcanoJob and passing buffered batchChan/resourceChan into ResourcesBatcher follows the existing collector patterns and gives O(1) exclusion lookups.

One stylistic note: all other Excluded* types live in internal/collector/types.go. For consistency, you might consider moving ExcludedVolcanoJob there as well and keeping this file focused on collector behavior.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ba09baa and 79b3dfe.

⛔ Files ignored due to path filters (4)
  • dist/backend-install.yaml is excluded by !**/dist/**
  • dist/install.yaml is excluded by !**/dist/**
  • dist/installer_updater.yaml is excluded by !**/dist/**
  • dist/zxporter.yaml is excluded by !**/dist/**
📒 Files selected for processing (5)
  • config/rbac/role.yaml (1 hunks)
  • internal/collector/interface.go (3 hunks)
  • internal/collector/types.go (1 hunks)
  • internal/collector/volcano_job_collector.go (1 hunks)
  • internal/controller/collectionpolicy_controller.go (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/collector/interface.go
  • config/rbac/role.yaml
🧰 Additional context used
🧬 Code graph analysis (2)
internal/collector/types.go (1)
internal/collector/interface.go (3)
  • VolumeAttachment (140-140)
  • KubeflowNotebook (141-141)
  • VolcanoJob (142-142)
internal/controller/collectionpolicy_controller.go (3)
internal/collector/volcano_job_collector.go (2)
  • ExcludedVolcanoJob (39-42)
  • NewVolcanoJobCollector (45-87)
internal/collector/batcher.go (2)
  • DefaultMaxBatchSize (16-16)
  • DefaultMaxBatchTime (19-19)
internal/collector/interface.go (1)
  • VolcanoJob (142-142)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build Docker Image
  • GitHub Check: Analyze (go)
  • GitHub Check: Run make test
  • GitHub Check: Build Docker Image
🔇 Additional comments (9)
internal/collector/types.go (1)

4-16: VolcanoJob inclusion in AllResourceTypes is correct and necessary

Adding VolcanoJob to the central AllResourceTypes slice keeps the registry consistent with the ResourceType enum and allows generic flows (like restart and registration) to see the new type.

internal/controller/collectionpolicy_controller.go (5)

251-251: RBAC for Volcano jobs is aligned with the new collector

The kubebuilder RBAC annotation for groups=batch.volcano.sh,resources=jobs,verbs=get;list;watch matches the GVR used by VolcanoJobCollector and is scoped read-only, consistent with other optional third‑party collectors.


780-782: Change detection for Volcano job exclusions is wired correctly

Adding the ExcludedVolcanoJobs comparison to identifyAffectedCollectors ensures that changing Volcano job exclusions triggers a selective restart of the volcano_job collector, matching the behavior of other resource types.


1459-1468: Selective restart wiring for volcano_job matches the collector constructor

The new "volcano_job" case correctly instantiates NewVolcanoJobCollector with:

  • r.DynamicClient,
  • newConfig.TargetNamespaces,
  • newConfig.ExcludedVolcanoJobs,
  • standard batch size/time and telemetry.

This mirrors how other dynamic collectors (e.g., VPA, Kubeflow) are wired.


2571-2582: VolcanoJobCollector registration and disable flow integration look consistent

The new entry in collectors:

  • Uses NewVolcanoJobCollector with the same (dynamicClient, TargetNamespaces, ExcludedVolcanoJobs, ...) signature as in selective restart.
  • Participates in the disabledCollectorsMap gating via name: collector.VolcanoJob, so "volcano_job" in DisabledCollectors will correctly skip registration.

This is consistent with the patterns used for other optional collectors.


3201-3210: Re-enabling volcano_job in handleDisabledCollectorsChange is wired correctly

The "volcano_job" branch recreates the collector with DynamicClient, TargetNamespaces, and ExcludedVolcanoJobs, then flows through the common register/start logic. This keeps enable/disable semantics for Volcano jobs aligned with all other collectors.

internal/collector/volcano_job_collector.go (3)

89-248: Informer setup and event handler wiring are solid

Using dynamicinformer.NewFilteredDynamicSharedInformerFactory on batch.volcano.sh/v1alpha1 jobs, with:

  • namespace scoping (single namespace vs all),
  • proper Add/Update/Delete handlers,
  • tombstone handling for deletes,
  • cache sync with timeout and telemetry reporting,

matches the patterns of other dynamic collectors and should behave well even when the Volcano CRD is missing (guarded by IsAvailable at registration time).


288-313: Exclusion logic correctly combines namespace scoping and explicit job exclusions

isExcluded:

  • Enforces TargetNamespaces semantics (treating a non-empty list as an allowlist, consistent with other collectors).
  • Then checks the excludedJobs map under an RLock.

This matches how other exclusion types are handled and should be safe under concurrent use.


364-387: Availability probe and AddResource helper are consistent with the rest of the framework

  • IsAvailable does a cheap List with Limit: 1 on the Volcano Job GVR and reports telemetry on failure, which is exactly what registerResourceCollectors expects.
  • AddResource validates type, emits telemetry on misuse, and reuses handleJobEvent(EventTypeAdd).

Both align with patterns in other collectors.

Also applies to: 389-409

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal/controller/collectionpolicy_controller.go (1)

80-141: Wire ExcludedVolcanoJobs into createNewConfig

You added ExcludedVolcanoJobs []collector.ExcludedVolcanoJob to PolicyConfig and pass it into NewVolcanoJobCollector, but createNewConfig never populates this slice from envSpec.Exclusions. Unless this is being set elsewhere, any CRD/env-driven excluded volcano jobs will be ignored in practice. Consider mirroring the pattern used for ExcludedJobs (and other Excluded* types) here.

For example, assuming envSpec.Exclusions exposes an ExcludedVolcanoJobs slice analogous to ExcludedJobs, something like this would align with the existing conversions:

@@ func (r *CollectionPolicyReconciler) createNewConfig(envSpec *monitoringv1.CollectionPolicySpec, logger logr.Logger) (*PolicyConfig, bool) {
 	// Jobs
 	for _, job := range envSpec.Exclusions.ExcludedJobs {
 		newConfig.ExcludedJobs = append(newConfig.ExcludedJobs, collector.ExcludedJob{
 			Namespace: job.Namespace,
 			Name:      job.Name,
 		})
 	}
+
+	// VolcanoJobs
+	for _, job := range envSpec.Exclusions.ExcludedVolcanoJobs {
+		newConfig.ExcludedVolcanoJobs = append(newConfig.ExcludedVolcanoJobs, collector.ExcludedVolcanoJob{
+			Namespace: job.Namespace,
+			Name:      job.Name,
+		})
+	}

Please confirm whether the spec/env wiring for ExcludedVolcanoJobs exists elsewhere or needs to be added here.

Also applies to: 311-618

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 79b3dfe and 0a38434.

⛔ Files ignored due to path filters (4)
  • dist/backend-install.yaml is excluded by !**/dist/**
  • dist/install.yaml is excluded by !**/dist/**
  • dist/installer_updater.yaml is excluded by !**/dist/**
  • dist/zxporter.yaml is excluded by !**/dist/**
📒 Files selected for processing (2)
  • internal/collector/interface.go (3 hunks)
  • internal/controller/collectionpolicy_controller.go (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
internal/controller/collectionpolicy_controller.go (3)
internal/collector/volcano_job_collector.go (2)
  • ExcludedVolcanoJob (39-42)
  • NewVolcanoJobCollector (45-87)
internal/collector/batcher.go (2)
  • DefaultMaxBatchSize (16-16)
  • DefaultMaxBatchTime (19-19)
internal/collector/interface.go (1)
  • VolcanoJob (142-142)
internal/collector/interface.go (1)
gen/api/v1/metrics_collector.pb.go (1)
  • ResourceType_RESOURCE_TYPE_VOLCANO_JOB (174-174)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build Docker Image
  • GitHub Check: Run make test
  • GitHub Check: Analyze (go)
  • GitHub Check: Build Docker Image
🔇 Additional comments (6)
internal/collector/interface.go (1)

141-143: VolcanoJob ResourceType wiring is consistent

Enum value, String() mapping ("volcano_job"), and ProtoType() mapping to RESOURCE_TYPE_VOLCANO_JOB are all aligned with the existing patterns and protobuf definitions. No changes needed.

Also applies to: 196-197, 304-305

internal/controller/collectionpolicy_controller.go (5)

239-255: RBAC for Volcano jobs matches collector’s read‑only usage

The new kubebuilder RBAC annotation for batch.volcano.sh jobs (get;list;watch) is consistent with how the VolcanoJob collector is used as a read‑only watcher alongside other optional third‑party resources.


783-785: ExcludedVolcanoJobs correctly drive selective restart for volcano_job collector

Hooking ExcludedVolcanoJobs into identifyAffectedCollectors so that changes flag "volcano_job" keeps the selective‑restart logic in line with other Excluded* lists.


1462-1471: Selective restart wiring for volcano_job mirrors other dynamic collectors

The volcano_job case in restartCollectors recreates NewVolcanoJobCollector with DynamicClient, TargetNamespaces, ExcludedVolcanoJobs, and the standard batch/logging parameters, matching the constructor signature and patterns of other namespaced collectors.


2593-2604: VolcanoJobCollector included in initial registration and DisabledCollectors flow

Adding NewVolcanoJobCollector to the collectors slice with name: collector.VolcanoJob ensures it’s registered on initialization and can be controlled via DisabledCollectors using the "volcano_job" key, consistent with other resource types.


3232-3241: Disabled‑collector re‑enablement covers volcano_job

The new "volcano_job" branch in handleDisabledCollectorsChange reconstructs NewVolcanoJobCollector from newConfig with the same arguments as other paths, so re‑enabling a previously disabled volcano job collector behaves consistently with the rest of the system.

@Parthiba-Hazra Parthiba-Hazra enabled auto-merge (squash) December 16, 2025 14:17
@Parthiba-Hazra Parthiba-Hazra merged commit eccbe3e into main Dec 16, 2025
41 of 44 checks passed
@Parthiba-Hazra Parthiba-Hazra deleted the ph/volcano-job-collector branch December 16, 2025 14:18
Parthiba-Hazra added a commit that referenced this pull request May 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants