PC-704, PC-708: add collectors to monitor SparkApplication and ScheduledSparkApplication resources#224
Conversation
📝 WalkthroughWalkthroughAdds namespace/name exclusion types and CRD entries; introduces SparkApplication and ScheduledSparkApplication resource types and two dynamic collectors with informer/batching/exclusion logic; wires collectors into controller lifecycle and restarts; and adds RBAC rules for sparkoperator.k8s.io. Changes
Sequence Diagram(s)sequenceDiagram
participant Ctrl as Controller
participant Dyn as DynamicClient
participant Factory as InformerFactory
participant Informer as Informer
participant Handler as EventHandler
participant Batcher as Batcher
Ctrl->>Dyn: NewSparkCollector(namespaces, exclusions)
Dyn->>Factory: Build informer for sparkoperator.k8s.io/v1beta2
Factory->>Informer: Create informer (sparkapplications / scheduledsparkapplications)
Ctrl->>Informer: Attach Add/Update/Delete handlers
Informer->>Informer: Start & sync cache
loop Resource Events
Informer->>Handler: Event (Add/Update/Delete, Unstructured)
Handler->>Handler: Validate & extract namespace/name
Handler->>Handler: Check exclusions map
alt Not excluded
Handler->>Batcher: Enqueue CollectedResource (type, key, timestamp, raw)
Batcher->>Ctrl: Flush batch to resource channel
else Excluded
Handler->>Handler: Skip
end
end
Ctrl->>Informer: Stop collector
Informer->>Factory: Stop informer
Batcher->>Batcher: Stop
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
dfc484e to
99e476d
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
internal/collector/scheduled_spark_application_collector.go (1)
258-259: Consider reducing log verbosity for collected resources.Logging at
Infolevel for every collected resource may generate excessive log output in clusters with many Spark applications. Consider usingV(1).Info()orDebuglevel for per-resource logging.🔎 Suggested change
// Send the processed resource to the batch channel - c.logger.Info("Collected Scheduled Spark Application resource", "key", key, "eventType", eventType, "resource", processedObj) + c.logger.V(1).Info("Collected Scheduled Spark Application resource", "key", key, "eventType", eventType)internal/collector/spark_application_collector.go (1)
258-259: Consider reducing log verbosity for collected resources.Same as for
ScheduledSparkApplicationCollector- logging atInfolevel for every collected resource may be excessive.🔎 Suggested change
// Send the processed resource to the batch channel - c.logger.Info("Collected Spark Application resource", "key", key, "eventType", eventType, "resource", processedObj) + c.logger.V(1).Info("Collected Spark Application resource", "key", key, "eventType", eventType)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (8)
dist/backend-install.yamlis excluded by!**/dist/**dist/install.yamlis excluded by!**/dist/**dist/installer_updater.yamlis excluded by!**/dist/**dist/zxporter.yamlis excluded by!**/dist/**gen/api/v1/common.pb.gois excluded by!**/*.pb.go,!**/gen/**gen/api/v1/k8s.pb.gois excluded by!**/*.pb.go,!**/gen/**gen/api/v1/metrics_collector.pb.gois excluded by!**/*.pb.go,!**/gen/**proto/dakr_proto_descriptor.binis excluded by!**/*.bin
📒 Files selected for processing (8)
api/v1/collectionpolicy_types.goconfig/rbac/role.yamlhelm-chart/zxporter/templates/zxporter-rbac.yamlinternal/collector/interface.gointernal/collector/scheduled_spark_application_collector.gointernal/collector/spark_application_collector.gointernal/collector/types.gointernal/controller/collectionpolicy_controller.go
🧰 Additional context used
🧬 Code graph analysis (4)
internal/collector/interface.go (1)
gen/api/v1/metrics_collector.pb.go (5)
ResourceType(98-98)ResourceType(305-307)ResourceType(309-311)ResourceType(318-320)ResourceType_RESOURCE_TYPE_SPARK_APPLICATION(176-176)
internal/collector/types.go (1)
internal/collector/interface.go (5)
VolumeAttachment(140-140)KubeflowNotebook(141-141)VolcanoJob(142-142)SparkApplication(143-143)ScheduledSparkApplication(144-144)
api/v1/collectionpolicy_types.go (1)
internal/collector/types.go (2)
ExcludedSparkApplication(180-186)ExcludedScheduledSparkApplication(189-195)
internal/controller/collectionpolicy_controller.go (3)
internal/collector/types.go (2)
ExcludedSparkApplication(180-186)ExcludedScheduledSparkApplication(189-195)internal/collector/interface.go (3)
Namespace(95-95)SparkApplication(143-143)ScheduledSparkApplication(144-144)internal/collector/batcher.go (2)
DefaultMaxBatchSize(16-16)DefaultMaxBatchTime(19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Test Metrics Server Lifecycle on K8s v1.32.3
- GitHub Check: Build Docker Image
- GitHub Check: Analyze (go)
🔇 Additional comments (25)
config/rbac/role.yaml (1)
308-316: LGTM!The RBAC rule correctly grants read-only access (
get,list,watch) to Spark Operator custom resources, which aligns with the monitoring requirements of the new collectors.helm-chart/zxporter/templates/zxporter-rbac.yaml (1)
353-361: LGTM!The Helm chart RBAC rule mirrors the config/rbac/role.yaml addition and is correctly placed alongside other optional third-party integrations.
internal/collector/interface.go (2)
143-144: LGTM!The new
SparkApplicationandScheduledSparkApplicationresource types are correctly added to the enum, maintaining proper ordering afterVolcanoJob.
199-200: LGTM!String representations follow the established snake_case convention used throughout the codebase.
internal/collector/types.go (2)
15-15: LGTM!The new resource types are correctly added to
AllResourceTypes(), enabling proper enumeration across the collection framework.
178-195: LGTM!The exclusion structs follow the established pattern with consistent field naming and JSON tags matching other exclusion types in this file.
api/v1/collectionpolicy_types.go (2)
96-105: LGTM!The new exclusion fields are properly added to the
Exclusionsstruct with consistent JSON tags andomitemptyoptions, following the established pattern for other exclusion categories.
389-423: LGTM!The new exclusion types (
ExcludedKubeflowNotebook,ExcludedVolcanoJob,ExcludedSparkApplication,ExcludedScheduledSparkApplication) follow the established pattern with proper documentation comments and JSON tags.internal/controller/collectionpolicy_controller.go (7)
124-127: LGTM!The
PolicyConfigstruct correctly includes the new exclusion fields for Spark-related resources, maintaining consistency with the existing exclusion pattern.
257-257: LGTM!The kubebuilder RBAC annotation correctly declares the required permissions for Spark Operator resources, which will be used for manifest generation.
595-625: LGTM!The exclusion conversion logic correctly transforms API spec exclusions to internal collector exclusion types for Kubeflow Notebooks, Volcano Jobs, and Spark Applications, following the established pattern used for other resource types.
822-828: LGTM!The
identifyAffectedCollectorsmethod correctly identifies when Spark-related exclusion changes require collector restarts.
1515-1534: LGTM!The selective restart logic correctly handles the new
spark_applicationandscheduled_spark_applicationcollector types with proper configuration passthrough.
2668-2691: LGTM!The collector registration correctly includes both
SparkApplicationCollectorandScheduledSparkApplicationCollectorwith appropriate configuration.
3329-3348: LGTM!The disabled collectors change handler correctly supports re-enabling Spark-related collectors with proper configuration.
internal/collector/scheduled_spark_application_collector.go (5)
22-36: LGTM!The collector struct is well-designed with proper synchronization (mutex for exclusion map access) and follows the established collector pattern.
38-81: LGTM!The constructor properly initializes all fields, creates the exclusion lookup map for O(1) access, and sets up the batching infrastructure.
83-242: LGTM!The
Startmethod correctly:
- Sets up the dynamic informer with proper namespace filtering
- Handles Add/Update/Delete events with tombstone support for deletions
- Waits for cache sync with a timeout
- Starts the batcher and context cancellation handler
309-346: LGTM!The
Stopmethod properly handles cleanup:
- Stops all informers by closing their stop channels
- Safely closes the main stop channel with a select guard to prevent double-close panics
- Closes the batch channel and stops the batcher
358-381: LGTM!The
IsAvailablemethod correctly checks cluster accessibility for the ScheduledSparkApplication CRD using a minimal list request.internal/collector/spark_application_collector.go (5)
22-36: LGTM!The collector struct follows the established pattern with proper field organization and synchronization.
38-81: LGTM!Constructor is correctly implemented with proper initialization of all fields and exclusion map setup.
83-242: LGTM!The
Startmethod correctly sets up the dynamic informer forsparkapplicationsresources with proper event handling, cache synchronization, and lifecycle management.
309-346: LGTM!The
Stopmethod correctly handles graceful shutdown with proper channel cleanup and panic prevention.
358-381: LGTM!The
IsAvailablemethod correctly validates CRD accessibility with minimal overhead.
d1fe4a6 to
41fd34a
Compare
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In @internal/collector/interface.go:
- Around line 310-313: Uncomment the commented-out case for
ScheduledSparkApplication in the switch that maps resource kinds to
gen.ResourceType (currently only SparkApplication is handled), replacing the
fall-through to the default; add the case for ScheduledSparkApplication so it
returns gen.ResourceType_RESOURCE_TYPE_SCHEDULED_SPARK_APPLICATION (ensure the
case label is exactly ScheduledSparkApplication and the return uses
gen.ResourceType_RESOURCE_TYPE_SCHEDULED_SPARK_APPLICATION).
In @internal/collector/scheduled_spark_application_collector.go:
- Around line 232-239: The goroutine that calls c.Stop() on ctx.Done() ignores
its error return; update the anonymous goroutine in
ScheduledSparkApplicationCollector so it captures the error from c.Stop() (e.g.,
err := c.Stop()) and handle it similarly to SparkApplicationCollector — log the
error (or record it via the collector's logger) instead of discarding it; ensure
the change references the existing goroutine and the c.Stop() call so the linter
no longer flags an unchecked error.
In @internal/collector/spark_application_collector.go:
- Around line 232-239: The goroutine that waits on ctx.Done() currently calls
c.Stop() and discards its error; capture the error returned by c.Stop() and log
it so shutdown failures are visible. Modify the select case for <-ctx.Done() to
call err := c.Stop() and if err != nil emit a log entry (e.g., using the package
logger used elsewhere in this file or a standard logger like
klog.Errorf/log.Printf) including context that shutdown failed for this
collector and the err value. Ensure no panic on logging and keep behavior
otherwise unchanged.
In @internal/controller/collectionpolicy_controller.go:
- Line 257: The kubebuilder annotation comment in collectionpolicy_controller.go
is missing a space after the comment delimiter; update the annotation line
"//+kubebuilder:rbac:groups=sparkoperator.k8s.io,resources=sparkapplications;scheduledsparkapplications,verbs=get;list;watch"
to use a space after the slashes (i.e., "// +kubebuilder:rbac:...") so the
linter recognizes the annotation; locate the annotation near the controller
registration (collectionpolicy_controller) and apply the space-only fix.
🧹 Nitpick comments (1)
internal/collector/scheduled_spark_application_collector.go (1)
1-403: Consider extracting common collector logic to reduce duplication.This collector is structurally identical to
SparkApplicationCollector, differing only in:
- GVR resource name (
scheduledsparkapplicationsvssparkapplications)- Type names and logger names
ResourceTypeconstantWhile this follows the existing pattern in the codebase (each collector is self-contained), the duplication could be reduced by extracting a generic dynamic-resource collector base or shared helper functions. This is a nice-to-have improvement that could be addressed in a follow-up refactor.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (10)
dist/backend-install.yamlis excluded by!**/dist/**dist/install.yamlis excluded by!**/dist/**dist/installer_updater.yamlis excluded by!**/dist/**dist/zxporter.yamlis excluded by!**/dist/**gen/api/v1/apiv1connect/cluster.connect.gois excluded by!**/gen/**gen/api/v1/cluster.pb.gois excluded by!**/*.pb.go,!**/gen/**gen/api/v1/cluster_grpc.pb.gois excluded by!**/*.pb.go,!**/gen/**gen/api/v1/common.pb.gois excluded by!**/*.pb.go,!**/gen/**gen/api/v1/metrics_collector.pb.gois excluded by!**/*.pb.go,!**/gen/**proto/dakr_proto_descriptor.binis excluded by!**/*.bin
📒 Files selected for processing (10)
api/v1/collectionpolicy_types.goapi/v1/zz_generated.deepcopy.goconfig/crd/bases/devzero.io_collectionpolicies.yamlconfig/rbac/role.yamlhelm-chart/zxporter/templates/zxporter-rbac.yamlinternal/collector/interface.gointernal/collector/scheduled_spark_application_collector.gointernal/collector/spark_application_collector.gointernal/collector/types.gointernal/controller/collectionpolicy_controller.go
🚧 Files skipped from review as they are similar to previous changes (4)
- config/crd/bases/devzero.io_collectionpolicies.yaml
- api/v1/zz_generated.deepcopy.go
- helm-chart/zxporter/templates/zxporter-rbac.yaml
- config/rbac/role.yaml
🧰 Additional context used
🧬 Code graph analysis (5)
internal/collector/types.go (2)
internal/collector/interface.go (6)
VolumeAttachment(140-140)KubeflowNotebook(141-141)VolcanoJob(142-142)SparkApplication(143-143)ScheduledSparkApplication(144-144)Namespace(95-95)gen/api/v1/metrics_collector.pb.go (3)
Namespace(1481-1518)Namespace(1533-1533)Namespace(1548-1550)
internal/collector/interface.go (1)
gen/api/v1/metrics_collector.pb.go (8)
ResourceType(98-98)ResourceType(313-315)ResourceType(317-319)ResourceType(326-328)Namespace(1481-1518)Namespace(1533-1533)Namespace(1548-1550)ResourceType_RESOURCE_TYPE_SPARK_APPLICATION(176-176)
internal/collector/spark_application_collector.go (3)
internal/collector/interface.go (6)
Namespace(95-95)EventTypeAdd(18-18)EventTypeUpdate(20-20)EventTypeDelete(22-22)EventType(12-12)SparkApplication(143-143)internal/logger/logger.go (1)
Logger(26-29)internal/collector/types.go (1)
ExcludedSparkApplication(180-186)
api/v1/collectionpolicy_types.go (2)
internal/collector/types.go (2)
ExcludedSparkApplication(180-186)ExcludedScheduledSparkApplication(189-195)gen/api/v1/metrics_collector.pb.go (3)
Namespace(1481-1518)Namespace(1533-1533)Namespace(1548-1550)
internal/collector/scheduled_spark_application_collector.go (4)
internal/collector/interface.go (8)
CollectedResource(320-336)Namespace(95-95)EventTypeAdd(18-18)EventTypeUpdate(20-20)EventTypeDelete(22-22)EventType(12-12)ResourceType(87-87)ScheduledSparkApplication(144-144)internal/collector/batcher.go (2)
ResourcesBatcher(23-31)NewResourcesBatcher(34-55)internal/logger/logger.go (1)
Logger(26-29)internal/collector/types.go (1)
ExcludedScheduledSparkApplication(189-195)
🪛 GitHub Actions: golangci-lint
internal/collector/scheduled_spark_application_collector.go
[error] 235-235: GolangCI-Lint: Error return value of c.Stop is not checked. (errcheck)
🪛 GitHub Check: golangci-lint
internal/collector/spark_application_collector.go
[failure] 235-235:
Error return value of c.Stop is not checked (errcheck)
internal/controller/collectionpolicy_controller.go
[failure] 257-257:
comment-spacings: no space between comment delimiter and comment text (revive)
internal/collector/scheduled_spark_application_collector.go
[failure] 235-235:
Error return value of c.Stop is not checked (errcheck)
🔇 Additional comments (16)
api/v1/collectionpolicy_types.go (2)
96-105: LGTM! New exclusion fields follow existing patterns.The new exclusion fields for Kubeflow Notebooks, Volcano Jobs, and Spark Applications are consistent with existing exclusion patterns in the struct.
389-423: LGTM! New exclusion types are well-structured.The new
ExcludedKubeflowNotebook,ExcludedVolcanoJob,ExcludedSparkApplication, andExcludedScheduledSparkApplicationtypes follow the established pattern withNamespaceandNamefields, proper JSON tags, and clear documentation comments.internal/collector/types.go (2)
15-15: LGTM! Resource types correctly added.
SparkApplicationandScheduledSparkApplicationare properly included in theAllResourceTypes()function.
178-195: LGTM! Exclusion types follow established patterns.The new
ExcludedSparkApplicationandExcludedScheduledSparkApplicationtypes follow the existing pattern with JSON tags, consistent withExcludedDatadogExtendedDaemonSetReplicaSetandExcludedArgoRolloutin this file.internal/collector/interface.go (2)
143-144: LGTM! Resource type enum values added.
SparkApplicationandScheduledSparkApplicationare properly added to theResourceTypeenum.
199-200: LGTM! String representations added.The string mappings for
SparkApplication("spark_application") andScheduledSparkApplication("scheduled_spark_application") are correctly added.internal/collector/spark_application_collector.go (3)
1-81: LGTM! Well-structured collector implementation.The
SparkApplicationCollectorfollows established patterns in the codebase:
- Proper initialization of exclusion map for O(1) lookups
- Buffered channels for batching
- Thread-safe access with mutex
- Clear logger naming
83-242: LGTM! Informer setup and lifecycle management.The
Startmethod properly:
- Configures namespace filtering for the dynamic informer
- Handles tombstone objects in
DeleteFunc- Implements cache sync timeout with appropriate telemetry
- Manages informer lifecycle with context cancellation
282-307: LGTM! Exclusion logic with proper synchronization.The
isExcludedmethod correctly:
- Checks namespace membership first
- Uses
RLockfor concurrent-safe access to the exclusion map- Returns appropriate boolean values
internal/controller/collectionpolicy_controller.go (6)
82-127: LGTM! PolicyConfig extended with new exclusion fields.The new exclusion fields for Kubeflow Notebooks, Volcano Jobs, and Spark Applications are properly added to the
PolicyConfigstruct, following the existing pattern.
595-625: LGTM! Exclusion mapping in createNewConfig.The new exclusion types are properly converted from the environment spec to the internal collector exclusion types, following the established pattern for other exclusion types.
822-828: LGTM! Affected collectors detection for Spark resources.The
identifyAffectedCollectorsmethod correctly identifies when Spark-related exclusion lists change and marks the corresponding collectors for restart.
1515-1534: LGTM! Selective restart support for Spark collectors.The
restartCollectorsmethod properly handles recreation ofSparkApplicationCollectorandScheduledSparkApplicationCollectorwith updated configuration.
2668-2691: LGTM! Spark collectors registered in registerResourceCollectors.Both
SparkApplicationCollectorandScheduledSparkApplicationCollectorare properly registered with the correct configuration parameters.
3329-3348: LGTM! handleDisabledCollectorsChange supports Spark collectors.The method correctly handles enabling/disabling of the new Spark collectors when the disabled collectors configuration changes.
internal/collector/scheduled_spark_application_collector.go (1)
83-242: LGTM! Collector implementation follows established patterns.The
ScheduledSparkApplicationCollectorproperly:
- Sets up dynamic informer with namespace filtering
- Handles Add/Update/Delete events with proper type assertions
- Handles tombstone objects for deletes
- Implements cache sync timeout
- Manages lifecycle with context cancellation
41fd34a to
7561495
Compare
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@internal/collector/spark_application_collector.go`:
- Around line 312-348: Stop() is racy and can double-close channels
(informerStopChs entries, c.stopCh, c.batchChan) when called concurrently; add a
stopOnce sync.Once field to the SparkApplicationCollector struct and wrap the
entire shutdown sequence in stopOnce.Do so it runs only once, and protect access
to shared maps (informerStopChs, informers) with the existing collector mutex or
add one if missing while closing each stopCh and resetting maps; perform
nil-checks and closes inside that single-once block (invoke c.batcher.stop()
inside it as well) to ensure channels are only closed once and teardown is
race-free.
- Around line 385-405: AddResource can call handleApplicationEvent which writes
to batchChan after Stop() has closed or nilled it, causing a panic; fix by
guarding against a stopped collector before dispatching: in AddResource check
the collector stop state or nil batchChan (e.g., verify c.batchChan != nil or a
c.stopped flag set by Stop()) and return a safe error when stopped, or
alternatively make handleApplicationEvent itself no-op if batchChan is nil;
reference AddResource, handleApplicationEvent, batchChan and Stop to locate
where to add the guard.
♻️ Duplicate comments (5)
internal/collector/scheduled_spark_application_collector.go (5)
93-109: Multi-namespace support is incomplete - same issue as SparkApplicationCollector.Same issue as in
spark_application_collector.go- when multiple namespaces are specified, it watches all namespaces and relies on client-side filtering.
131-145: Unused old object in UpdateFunc.Same as
spark_application_collector.go- the old object is validated but not used.
260-268: Logging entire resource object at Info level may cause log bloat.Same issue as in
spark_application_collector.go.
312-348:Stop()is not safe for concurrent calls.Same concurrency issue as
spark_application_collector.go.
385-405:AddResourcesends tobatchChanwithout checking if collector is stopped.Same issue as in
spark_application_collector.go- can panic if called afterStop().
🧹 Nitpick comments (5)
internal/collector/spark_application_collector.go (4)
93-109: Multi-namespace support is incomplete - only the first namespace is used.The namespace filtering logic only watches the first namespace when
len(c.namespaces) == 1. When multiple namespaces are specified, it falls back to watching all namespaces (""), which means the informer will receive events from namespaces not in the configured list, relying solely onisExcludedto filter them out. This is inefficient and inconsistent with the expected behavior.If you need true multi-namespace support, you should create one informer per namespace. Alternatively, if the current behavior is intentional (watch all, filter client-side), add a comment clarifying this design choice.
♻️ Suggested approach for true multi-namespace support
// Create an informer per namespace for efficient filtering for _, ns := range c.namespaces { if ns == "" { continue } factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( c.dynamicClient, 0, ns, nil, ) informer := factory.ForResource(gvr).Informer() // ... register handlers and store informer }
131-145: Unused old object in UpdateFunc - consider comparing for changes.The
oldObjis converted to*unstructured.Unstructuredand validated, but then discarded. If you don't need the old object for comparison logic, you can simplify by removing the conversion. Otherwise, consider comparingoldU.GetResourceVersion()withnewU.GetResourceVersion()to skip no-op updates.♻️ Option 1: Remove unused old object check
UpdateFunc: func(oldObj, newObj interface{}) { - _, ok := oldObj.(*unstructured.Unstructured) - if !ok { - c.logger.Error(nil, "Failed to convert old object to unstructured") - c.telemetryLogger.Report( - gen.LogLevel_LOG_LEVEL_ERROR, - "SparkApplicationCollector_UpdateFunc", - "Failed to convert old object to unstructured", - fmt.Errorf("type assertion failed"), - map[string]string{ - "object_type": fmt.Sprintf("%T", oldObj), - }, - ) - return - } - newU, ok := newObj.(*unstructured.Unstructured)
260-268: Logging entire resource object at Info level may cause log bloat.Logging
"resource", processedObjat Info level will dump the entire resource (including the raw unstructured object) to logs on every event. This can create excessive log volume and potentially expose sensitive data. Consider logging at Debug level or removing theresourcefield.♻️ Proposed fix
- c.logger.Info("Collected SparkApplication resource", "key", key, "eventType", eventType, "resource", processedObj) + c.logger.V(1).Info("Collected SparkApplication resource", "key", key, "eventType", eventType)
284-309: Namespace filtering inisExcludedis redundant when informer is namespace-scoped.When a single namespace is configured, the informer is already scoped to that namespace (line 99), so the namespace check here (lines 287-298) is redundant for that case. Additionally, the read lock only protects the
excludedApplicationsmap access, butc.namespacesis accessed without synchronization. Sincenamespacesis set once at construction and never modified, this is safe, but consider adding a comment to clarify immutability.internal/collector/scheduled_spark_application_collector.go (1)
1-405: High code duplication withspark_application_collector.go- consider extracting a shared base.These two collectors share nearly identical code (~95% similar), differing only in:
- GVR resource name (
sparkapplicationsvsscheduledsparkapplications)- ResourceType enum value
- Exclusion type (
ExcludedSparkApplicationvsExcludedScheduledSparkApplication)- Logger name and log messages
Consider extracting a generic
SparkCollectorBaseor using a factory pattern with configuration to reduce duplication and maintenance burden.♻️ Conceptual approach for shared base
type sparkCollectorConfig struct { gvr schema.GroupVersionResource resourceType ResourceType collectorName string resourceName string } type BaseSparkCollector struct { config sparkCollectorConfig dynamicClient dynamic.Interface // ... shared fields } func NewSparkApplicationCollector(...) *BaseSparkCollector { return newBaseSparkCollector(sparkCollectorConfig{ gvr: schema.GroupVersionResource{ Group: "sparkoperator.k8s.io", Version: "v1beta2", Resource: "sparkapplications", }, resourceType: SparkApplication, collectorName: "spark-application-collector", resourceName: "spark-applications", }, ...) }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/collector/scheduled_spark_application_collector.gointernal/collector/spark_application_collector.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Build Docker Image
- GitHub Check: golangci-lint
- GitHub Check: Build Docker Image
- GitHub Check: Analyze (actions)
- GitHub Check: Analyze (go)
🔇 Additional comments (3)
internal/collector/spark_application_collector.go (2)
350-358: LGTM - accessor methods are straightforward.The
GetResourceChannel()andGetType()methods are simple and correct.
360-383:IsAvailableperforms a List call which may be slow or fail with large resource counts.Using
ListwithLimit: 1is a reasonable approach, but be aware that:
- The call may still be slow if the API server needs to scan many resources
- Consider using a
ServerSideApplyor discovery check for CRD existence as an alternativeThis is acceptable for availability checking.
internal/collector/scheduled_spark_application_collector.go (1)
231-244: Previous review comment about uncheckedc.Stop()error has been addressed.The error from
c.Stop()is now properly captured and logged.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| func (c *SparkApplicationCollector) Stop() error { | ||
| c.logger.Info("Stopping SparkApplication collector") | ||
|
|
||
| // Stop all informers | ||
| for key, stopCh := range c.informerStopChs { | ||
| c.logger.Info("Stopping informer", "resource", key) | ||
| close(stopCh) | ||
| } | ||
|
|
||
| c.informers = make(map[string]cache.SharedIndexInformer) | ||
| c.informerStopChs = make(map[string]chan struct{}) | ||
|
|
||
| // Close the main stop channel (signals informers to stop) | ||
| select { | ||
| case <-c.stopCh: | ||
| c.logger.Info("SparkApplication collector stop channel already closed") | ||
| default: | ||
| close(c.stopCh) | ||
| c.logger.Info("Closed SparkApplication collector stop channel") | ||
| } | ||
|
|
||
| // Close the batchChan (input to the batcher). | ||
| if c.batchChan != nil { | ||
| close(c.batchChan) | ||
| c.batchChan = nil | ||
| c.logger.Info("Closed SparkApplication collector batch input channel") | ||
| } | ||
|
|
||
| // Stop the batcher (waits for completion). | ||
| if c.batcher != nil { | ||
| c.batcher.stop() | ||
| c.logger.Info("SparkApplication collector batcher stopped") | ||
| } | ||
| // resourceChan is closed by the batcher's defer func. | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Stop() is not safe for concurrent calls - potential double-close panic.
If Stop() is called concurrently (e.g., from both the shutdown goroutine and an external caller), there's a race condition:
- The
selectonc.stopCh(lines 325-331) is safe for detecting if already closed - However,
close(stopCh)in the loop at line 318 could panic if called twice for the same map entry - The
c.batchChannil check (line 334) is racy - two goroutines could pass the check simultaneously
Consider using sync.Once for shutdown or adding mutex protection.
🔧 Proposed fix using sync.Once
Add a field to the struct:
stopOnce sync.OnceThen wrap the shutdown logic:
func (c *SparkApplicationCollector) Stop() error {
+ var stopErr error
+ c.stopOnce.Do(func() {
c.logger.Info("Stopping SparkApplication collector")
// Stop all informers
for key, stopCh := range c.informerStopChs {
c.logger.Info("Stopping informer", "resource", key)
close(stopCh)
}
// ... rest of cleanup
+ })
+ return stopErr
}🤖 Prompt for AI Agents
In `@internal/collector/spark_application_collector.go` around lines 312 - 348,
Stop() is racy and can double-close channels (informerStopChs entries, c.stopCh,
c.batchChan) when called concurrently; add a stopOnce sync.Once field to the
SparkApplicationCollector struct and wrap the entire shutdown sequence in
stopOnce.Do so it runs only once, and protect access to shared maps
(informerStopChs, informers) with the existing collector mutex or add one if
missing while closing each stopCh and resetting maps; perform nil-checks and
closes inside that single-once block (invoke c.batcher.stop() inside it as well)
to ensure channels are only closed once and teardown is race-free.
| // AddResource manually adds a SparkApplication resource to be processed by the collector | ||
| func (c *SparkApplicationCollector) AddResource(resource interface{}) error { | ||
| app, ok := resource.(*unstructured.Unstructured) | ||
| if !ok { | ||
| err := fmt.Errorf("expected *unstructured.Unstructured, got %T", resource) | ||
| c.telemetryLogger.Report( | ||
| gen.LogLevel_LOG_LEVEL_ERROR, | ||
| "SparkApplicationCollector_AddResource", | ||
| "Invalid resource type", | ||
| err, | ||
| map[string]string{ | ||
| "expected_type": "*unstructured.Unstructured", | ||
| "actual_type": fmt.Sprintf("%T", resource), | ||
| }, | ||
| ) | ||
| return err | ||
| } | ||
|
|
||
| c.handleApplicationEvent(app, EventTypeAdd) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
AddResource sends to batchChan without checking if collector is stopped.
If AddResource is called after Stop(), the batchChan will be closed (set to nil), causing a panic when writing to it at line 262 (via handleApplicationEvent).
🔧 Proposed fix
func (c *SparkApplicationCollector) AddResource(resource interface{}) error {
+ select {
+ case <-c.stopCh:
+ return fmt.Errorf("collector is stopped")
+ default:
+ }
+
app, ok := resource.(*unstructured.Unstructured)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // AddResource manually adds a SparkApplication resource to be processed by the collector | |
| func (c *SparkApplicationCollector) AddResource(resource interface{}) error { | |
| app, ok := resource.(*unstructured.Unstructured) | |
| if !ok { | |
| err := fmt.Errorf("expected *unstructured.Unstructured, got %T", resource) | |
| c.telemetryLogger.Report( | |
| gen.LogLevel_LOG_LEVEL_ERROR, | |
| "SparkApplicationCollector_AddResource", | |
| "Invalid resource type", | |
| err, | |
| map[string]string{ | |
| "expected_type": "*unstructured.Unstructured", | |
| "actual_type": fmt.Sprintf("%T", resource), | |
| }, | |
| ) | |
| return err | |
| } | |
| c.handleApplicationEvent(app, EventTypeAdd) | |
| return nil | |
| } | |
| // AddResource manually adds a SparkApplication resource to be processed by the collector | |
| func (c *SparkApplicationCollector) AddResource(resource interface{}) error { | |
| select { | |
| case <-c.stopCh: | |
| return fmt.Errorf("collector is stopped") | |
| default: | |
| } | |
| app, ok := resource.(*unstructured.Unstructured) | |
| if !ok { | |
| err := fmt.Errorf("expected *unstructured.Unstructured, got %T", resource) | |
| c.telemetryLogger.Report( | |
| gen.LogLevel_LOG_LEVEL_ERROR, | |
| "SparkApplicationCollector_AddResource", | |
| "Invalid resource type", | |
| err, | |
| map[string]string{ | |
| "expected_type": "*unstructured.Unstructured", | |
| "actual_type": fmt.Sprintf("%T", resource), | |
| }, | |
| ) | |
| return err | |
| } | |
| c.handleApplicationEvent(app, EventTypeAdd) | |
| return nil | |
| } |
🤖 Prompt for AI Agents
In `@internal/collector/spark_application_collector.go` around lines 385 - 405,
AddResource can call handleApplicationEvent which writes to batchChan after
Stop() has closed or nilled it, causing a panic; fix by guarding against a
stopped collector before dispatching: in AddResource check the collector stop
state or nil batchChan (e.g., verify c.batchChan != nil or a c.stopped flag set
by Stop()) and return a safe error when stopped, or alternatively make
handleApplicationEvent itself no-op if batchChan is nil; reference AddResource,
handleApplicationEvent, batchChan and Stop to locate where to add the guard.
5a2475e to
0fa18c4
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@internal/collector/spark_application_collector.go`:
- Around line 209-224: When cache sync times out in Start (the block using
syncCtx, cancel and cache.WaitForCacheSync with informer.HasSynced), stop the
informer and clear the collector's internal maps/queues before returning the
error: call the appropriate stop/close on informer to prevent it from enqueuing
more events, and zero out any internal maps or event queues the collector holds
(e.g., maps that track SparkApplications / pending events) so goroutines and
queued events are not leaked after the function returns.
♻️ Duplicate comments (2)
internal/collector/spark_application_collector.go (2)
22-36: MakeStop()idempotent to avoid double‑close panics.Concurrent calls can still double‑close informer stop channels or
batchChan. Wrapping shutdown insync.Onceprevents panic and makes Stop safe to call from multiple goroutines.🔒 Suggested fix
type SparkApplicationCollector struct { dynamicClient dynamic.Interface batchChan chan CollectedResource resourceChan chan []CollectedResource batcher *ResourcesBatcher stopCh chan struct{} informers map[string]cache.SharedIndexInformer informerStopChs map[string]chan struct{} namespaces []string excludedApplications map[types.NamespacedName]bool logger logr.Logger telemetryLogger telemetry_logger.Logger mu sync.RWMutex + stopOnce sync.Once } func (c *SparkApplicationCollector) Stop() error { - c.logger.Info("Stopping SparkApplication collector") - - // Stop all informers - for key, stopCh := range c.informerStopChs { - c.logger.Info("Stopping informer", "resource", key) - close(stopCh) - } - - c.informers = make(map[string]cache.SharedIndexInformer) - c.informerStopChs = make(map[string]chan struct{}) - - // Close the main stop channel (signals informers to stop) - select { - case <-c.stopCh: - c.logger.Info("SparkApplication collector stop channel already closed") - default: - close(c.stopCh) - c.logger.Info("Closed SparkApplication collector stop channel") - } - - // Close the batchChan (input to the batcher). - if c.batchChan != nil { - close(c.batchChan) - c.batchChan = nil - c.logger.Info("Closed SparkApplication collector batch input channel") - } - - // Stop the batcher (waits for completion). - if c.batcher != nil { - c.batcher.stop() - c.logger.Info("SparkApplication collector batcher stopped") - } - // resourceChan is closed by the batcher's defer func. - - return nil + c.stopOnce.Do(func() { + c.logger.Info("Stopping SparkApplication collector") + + // Stop all informers + for key, stopCh := range c.informerStopChs { + c.logger.Info("Stopping informer", "resource", key) + close(stopCh) + } + + c.informers = make(map[string]cache.SharedIndexInformer) + c.informerStopChs = make(map[string]chan struct{}) + + // Close the main stop channel (signals informers to stop) + select { + case <-c.stopCh: + c.logger.Info("SparkApplication collector stop channel already closed") + default: + close(c.stopCh) + c.logger.Info("Closed SparkApplication collector stop channel") + } + + // Close the batchChan (input to the batcher). + if c.batchChan != nil { + close(c.batchChan) + c.batchChan = nil + c.logger.Info("Closed SparkApplication collector batch input channel") + } + + // Stop the batcher (waits for completion). + if c.batcher != nil { + c.batcher.stop() + c.logger.Info("SparkApplication collector batcher stopped") + } + // resourceChan is closed by the batcher's defer func. + }) + + return nil }Also applies to: 312-347
385-404: GuardAddResourcewhen the collector is stopped.If
Stop()has already closedbatchChan,AddResourcecan panic viahandleApplicationEvent. Add a stop check before dispatching.🛡️ Suggested fix
func (c *SparkApplicationCollector) AddResource(resource interface{}) error { + select { + case <-c.stopCh: + return fmt.Errorf("collector is stopped") + default: + } app, ok := resource.(*unstructured.Unstructured) if !ok { err := fmt.Errorf("expected *unstructured.Unstructured, got %T", resource) c.telemetryLogger.Report( gen.LogLevel_LOG_LEVEL_ERROR,
🧹 Nitpick comments (2)
internal/collector/scheduled_spark_application_collector.go (2)
311-348: Consider adding mutex protection during Stop.The
Stopmethod modifies shared state (informers,informerStopChs,batchChan) without holding the mutex. While the collector is typically stopped from a single goroutine, concurrent access during shutdown could cause races. The mutexc.muis only used inisExcludedfor readingexcludedApplications.This is a minor concern as the typical usage pattern has single-threaded lifecycle management, but it's worth noting for defensive coding.
♻️ Optional: Add mutex protection for Stop
func (c *ScheduledSparkApplicationCollector) Stop() error { c.logger.Info("Stopping ScheduledSparkApplication collector") + c.mu.Lock() + defer c.mu.Unlock() + // Stop all informers for key, stopCh := range c.informerStopChs {Note: This would require changing
isExcludedto use a separate mutex or ensuring the full mutex is used consistently.
261-268: Consider reducing log verbosity for collected resources.Logging at
Infolevel for every collected resource (including the fullprocessedObjwith raw data) may generate excessive log volume in production clusters with many ScheduledSparkApplications. Consider usingV(1).Info()orDebuglevel for routine collection events.♻️ Optional: Reduce log verbosity
- c.logger.Info("Collected ScheduledSparkApplication resource", "key", key, "eventType", eventType, "resource", processedObj) + c.logger.V(1).Info("Collected ScheduledSparkApplication resource", "key", key, "eventType", eventType)
| syncCtx, cancel := context.WithTimeout(ctx, 30*time.Second) | ||
| defer cancel() | ||
|
|
||
| if !cache.WaitForCacheSync(syncCtx.Done(), informer.HasSynced) { | ||
| c.telemetryLogger.Report( | ||
| gen.LogLevel_LOG_LEVEL_ERROR, | ||
| "SparkApplicationCollector_Start", | ||
| "Timeout waiting for cache to sync", | ||
| fmt.Errorf("cache sync timeout"), | ||
| map[string]string{ | ||
| "resource": "sparkapplications", | ||
| "timeout": "30s", | ||
| }, | ||
| ) | ||
| return fmt.Errorf("timeout waiting for SparkApplications cache to sync") | ||
| } |
There was a problem hiding this comment.
Clean up informer on cache‑sync timeout.
If cache sync times out, Start returns while the informer keeps running (and can still enqueue events) even though the batcher never starts. This can leak goroutines and back up the informer queue. Please stop the informer and clear maps before returning the error.
🧹 Suggested fix
if !cache.WaitForCacheSync(syncCtx.Done(), informer.HasSynced) {
c.telemetryLogger.Report(
gen.LogLevel_LOG_LEVEL_ERROR,
"SparkApplicationCollector_Start",
"Timeout waiting for cache to sync",
fmt.Errorf("cache sync timeout"),
map[string]string{
"resource": "sparkapplications",
"timeout": "30s",
},
)
+ // Prevent leaked informer on startup failure.
+ if stopCh, ok := c.informerStopChs[appKey]; ok {
+ close(stopCh)
+ delete(c.informerStopChs, appKey)
+ delete(c.informers, appKey)
+ }
return fmt.Errorf("timeout waiting for SparkApplications cache to sync")
}🤖 Prompt for AI Agents
In `@internal/collector/spark_application_collector.go` around lines 209 - 224,
When cache sync times out in Start (the block using syncCtx, cancel and
cache.WaitForCacheSync with informer.HasSynced), stop the informer and clear the
collector's internal maps/queues before returning the error: call the
appropriate stop/close on informer to prevent it from enqueuing more events, and
zero out any internal maps or event queues the collector holds (e.g., maps that
track SparkApplications / pending events) so goroutines and queued events are
not leaked after the function returns.
eb637e9 to
d3d72d8
Compare
c7d25f3 to
15b9177
Compare
…ledSparkApplication resources (#224) * add sparkapp collection support * update crd to include exclusion fields for missing resources * lint * nit * nit * protos * proto * proto2 * clean up informer on cache‑sync timeout
Fixes: PC-704
Fixes: PC-708
Summary by CodeRabbit
New Features
Chores
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.