kubeflow notebook collector#168
Conversation
WalkthroughAdds Kubeflow Notebook resource support: new ResourceType and dynamic KubeflowNotebookCollector with exclusions/batching, controller wiring (config, registration, restart, enable/disable), RBAC changes, proto enum + NodeMetadata RPC, Makefile/CRD tooling bump, and dependency updates. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Reconciler as Controller Reconciler
participant Registry as Collector Registry
participant KFNC as KubeflowNotebookCollector
participant K8s as K8s API (dynamic)
participant Batcher as Batcher
participant Stream as Resource Channel
rect rgba(230,245,255,0.6)
Reconciler->>Registry: registerResourceCollectors(config)
alt kubeflow_notebook enabled & available
Registry->>KFNC: New(namespaces, excludes...)
KFNC->>K8s: Create dynamic informer (kubeflow.org/v1 Notebooks)
KFNC->>KFNC: Start informer & handlers
KFNC->>Batcher: Start batching
else disabled or unavailable
Registry-->>Reconciler: skip registration
end
end
rect rgba(235,255,235,0.6)
K8s-->>KFNC: Add/Update/Delete Notebook event
KFNC->>KFNC: Apply namespace/exclusion filter
KFNC->>Batcher: enqueue CollectedResource{type=kubeflow_notebook,event}
Batcher-->>Stream: emit []CollectedResource
end
sequenceDiagram
autonumber
participant Client as Client
participant Svc as MetricsCollectorService
participant Store as Node Metadata Source
Client->>Svc: NodeMetadata(NodeMetadataRequest)
Svc->>Store: fetch node metadata for team_id, cluster_id
Store-->>Svc: map<string, Node>
Svc-->>Client: NodeMetadataResponse{node_to_meta}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro ⛔ Files ignored due to path filters (3)
📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/controller/collectionpolicy_controller.go (1)
555-576: Populate ExcludedKubeflowNotebooks from the specLine 559 adds
ExcludedKubeflowNotebookstoPolicyConfig, butcreateNewConfignever copies the notebooks listed in the CRD/env spec into this field. As a result the exclusion list is always empty, so the new selective-restart trigger never fires and the Kubeflow notebook collector will keep watching notebooks the user asked us to skip. Please wire the merge logic just like the other exclusion lists.@@ // CSI newConfig.ExcludedCSIDrivers = envSpec.Exclusions.ExcludedCSIDrivers for _, csc := range envSpec.Exclusions.ExcludedCSIStorageCapacities { newConfig.ExcludedCSIStorageCapacities = append(newConfig.ExcludedCSIStorageCapacities, collector.ExcludedCSIStorageCapacity{ Namespace: csc.Namespace, Name: csc.Name, }) } newConfig.ExcludedVolumeAttachments = envSpec.Exclusions.ExcludedVolumeAttachments + + // Kubeflow Notebooks + for _, nb := range envSpec.Exclusions.ExcludedKubeflowNotebooks { + newConfig.ExcludedKubeflowNotebooks = append(newConfig.ExcludedKubeflowNotebooks, collector.ExcludedKubeflowNotebook{ + Namespace: nb.Namespace, + Name: nb.Name, + }) + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
gen/api/v1/apiv1connect/metrics_collector.connect.gois excluded by!**/gen/**gen/api/v1/common.pb.gois excluded by!**/*.pb.go,!**/gen/**gen/api/v1/metrics_collector.pb.gois excluded by!**/*.pb.go,!**/gen/**gen/api/v1/metrics_collector_grpc.pb.gois excluded by!**/*.pb.go,!**/gen/**gen/google/type/money.pb.gois excluded by!**/*.pb.go,!**/gen/**proto/dakr_proto_descriptor.binis excluded by!**/*.bin
📒 Files selected for processing (8)
Makefile(1 hunks)config/crd/bases/devzero.io_collectionpolicies.yaml(1 hunks)config/rbac/role.yaml(2 hunks)internal/collector/interface.go(3 hunks)internal/collector/kubeflow_notebook_collector.go(1 hunks)internal/collector/types.go(1 hunks)internal/controller/collectionpolicy_controller.go(6 hunks)proto/metrics_collector.proto(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
internal/collector/interface.go (1)
gen/api/v1/metrics_collector.pb.go (1)
ResourceType_RESOURCE_TYPE_KUBEFLOW_NOTEBOOKS(172-172)
internal/collector/types.go (2)
internal/collector/interface.go (12)
Namespace(95-95)CSINode(130-130)Karpenter(131-131)Datadog(132-132)ArgoRollouts(133-133)KedaScaledJob(135-135)KedaScaledObject(136-136)ClusterSnapshot(137-137)CSIDriver(138-138)CSIStorageCapacity(139-139)VolumeAttachment(140-140)KubeflowNotebook(141-141)gen/api/v1/metrics_collector.pb.go (6)
Namespace(1465-1499)Namespace(1514-1514)Namespace(1529-1531)ClusterSnapshot(1909-1920)ClusterSnapshot(1935-1935)ClusterSnapshot(1950-1952)
internal/collector/kubeflow_notebook_collector.go (3)
internal/collector/interface.go (7)
CollectedResource(305-321)EventTypeAdd(18-18)EventTypeUpdate(20-20)EventTypeDelete(22-22)EventType(12-12)ResourceType(87-87)KubeflowNotebook(141-141)internal/collector/batcher.go (2)
ResourcesBatcher(23-31)NewResourcesBatcher(34-55)internal/logger/logger.go (1)
Logger(26-29)
internal/controller/collectionpolicy_controller.go (3)
internal/collector/kubeflow_notebook_collector.go (2)
ExcludedKubeflowNotebook(39-42)NewKubeflowNotebookCollector(45-87)internal/collector/batcher.go (2)
DefaultMaxBatchSize(16-16)DefaultMaxBatchTime(19-19)internal/collector/interface.go (1)
KubeflowNotebook(141-141)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (6)
config/rbac/role.yaml (1)
7-34: Restore core API RBAC permissionsThis change drops the manager’s
get/list/watchon core resources (pods, nodes, services, PVCs, etc.). Every existing collector still issues those LIST/WATCH calls, so at runtime we’ll start getting 403s (forbidden: User "system:serviceaccount:..." cannot list resource "pods" in API group ""). That breaks virtually all collection flows, not just the new Kubeflow notebook support. Please keep the original core RBAC block and then layer the new Kubeflow rule on top.Apply this diff to restore the required permissions:
+- apiGroups: + - "" + resources: + - configmaps + - endpoints + - events + - limitranges + - namespaces + - nodes + - persistentvolumeclaims + - persistentvolumes + - pods + - replicationcontrollers + - resourcequotas + - serviceaccounts + - services + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - nodes/metrics + - nodes/status + - pods/status + verbs: + - getLikely an incorrect or invalid review comment.
config/crd/bases/devzero.io_collectionpolicies.yaml (1)
6-6: Version annotation alignment looks goodController-gen annotation now matches the toolchain bump in the Makefile, so future regenerations will stay in sync.
Makefile (1)
505-505: Controller-tools bump acknowledgedThe version update to v0.16.5 lines up with the CRD annotation change and keeps
go installpulls consistent.internal/collector/types.go (1)
13-15: KubeflowNotebook inclusion in AllResourceTypes looks rightGreat to see
KubeflowNotebook(and the adjacent late additions) surfaced here—this keeps the collector registry aware of the new resource.internal/collector/interface.go (1)
141-142: KubeflowNotebook constant/string/proto wiring is consistentThe enum addition, string mapping, and proto mapping are all in lockstep, so downstream translations should just work.
Also applies to: 193-194, 297-298
proto/metrics_collector.proto (1)
8-9: Proto additions cover Kubeflow notebooks and node metadata cleanlyImporting
common.proto, adding the notebook resource type, and exposing the NodeMetadata RPC/messages gives the API surface everything it needs.Also applies to: 122-123, 443-453
| c.logger.Info("Stopping informer", "resource", key) | ||
| close(stopCh) | ||
| } | ||
|
|
||
| c.informers = make(map[string]cache.SharedIndexInformer) | ||
| c.informerStopChs = make(map[string]chan struct{}) | ||
|
|
||
| // Close the main stop channel (signals informers to stop) | ||
| select { | ||
| case <-c.stopCh: | ||
| c.logger.Info("Kubeflow Notebook collector stop channel already closed") | ||
| default: | ||
| close(c.stopCh) | ||
| c.logger.Info("Closed Kubeflow Notebook collector stop channel") | ||
| } | ||
|
|
||
| // Close the batchChan (input to the batcher). | ||
| if c.batchChan != nil { | ||
| close(c.batchChan) | ||
| c.batchChan = nil | ||
| c.logger.Info("Closed Kubeflow Notebook collector batch input channel") | ||
| } | ||
|
|
||
| // Stop the batcher (waits for completion). | ||
| if c.batcher != nil { | ||
| c.batcher.stop() | ||
| c.logger.Info("Kubeflow Notebook collector batcher stopped") | ||
| } | ||
| // resourceChan is closed by the batcher's defer func. |
There was a problem hiding this comment.
Guard batch channel closes during Stop
Stop closes batchChan while informer event handlers may still be executing. If one of those handlers calls handleNotebookEvent after the close, we panic with “send on closed channel.” Please wait for in‑flight handlers to drain before closing the channel; a simple wait group around the handler suffices.
type KubeflowNotebookCollector struct {
dynamicClient dynamic.Interface
@@
telemetryLogger telemetry_logger.Logger
mu sync.RWMutex
+ wg sync.WaitGroup
}
@@
func (c *KubeflowNotebookCollector) handleNotebookEvent(obj *unstructured.Unstructured, eventType EventType) {
+ c.wg.Add(1)
+ defer c.wg.Done()
+
name := obj.GetName()
namespace := obj.GetNamespace()
@@
func (c *KubeflowNotebookCollector) Stop() error {
c.logger.Info("Stopping Kubeflow Notebook collector")
@@
close(stopCh)
}
+ // Wait for any in-flight handlers to finish before closing channels.
+ c.wg.Wait()
+
c.informers = make(map[string]cache.SharedIndexInformer)
c.informerStopChs = make(map[string]chan struct{})
@@
if c.batchChan != nil {
close(c.batchChan)
c.batchChan = nil🤖 Prompt for AI Agents
internal/collector/kubeflow_notebook_collector.go lines 320-348: Stop currently
closes batchChan while informer event handlers may still be running, causing
“send on closed channel” panics; add a sync.WaitGroup field to the collector,
have each informer event handler Add(1) at start and Done() at exit (or defer
Done) around calls to handleNotebookEvent, and in Stop call wg.Wait() (with a
reasonable ordering: stop informers first or signal them to stop, then wait for
the wg) before closing and nil-ing batchChan; ensure Start/Stop initialization
makes the wg available and handlers check for nil/closed channel where
appropriate to avoid races.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
dist/backend-install.yamlis excluded by!**/dist/**dist/install.yamlis excluded by!**/dist/**dist/installer_updater.yamlis excluded by!**/dist/**dist/zxporter.yamlis excluded by!**/dist/**go.sumis excluded by!**/*.sum
📒 Files selected for processing (1)
go.mod(4 hunks)
| go 1.23.8 | ||
|
|
||
| toolchain go1.24.0 | ||
| go 1.24.0 |
There was a problem hiding this comment.
Fix Go directive version format
The go directive must use the major.minor release format. With go 1.24.0, the Go toolchain rejects the module (go: go.mod: go 1.24.0: invalid go version) and prevents builds. Please drop the patch component.
-go 1.24.0
+go 1.24📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| go 1.24.0 | |
| go 1.24 |
🤖 Prompt for AI Agents
In go.mod around line 3, the go directive uses an invalid version format "go
1.24.0"; update it to the major.minor format by removing the patch component so
it reads "go 1.24" to satisfy the Go toolchain and allow builds.
* kubeflow notebook collector * make build installer * emty response NodeMetadata method * log * proto changes
Summary by CodeRabbit
New Features
Tests
Chores