diff --git a/data-models/pkg/model/health_event_crd.go b/data-models/pkg/model/health_event_crd.go index 54dc6d4d8..81ef746c5 100644 --- a/data-models/pkg/model/health_event_crd.go +++ b/data-models/pkg/model/health_event_crd.go @@ -15,11 +15,32 @@ package model import ( + "encoding/json" + "fmt" + "github.com/nvidia/nvsentinel/data-models/pkg/protos" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) -// HealthEventResourceCRD is the Kubernetes CRD type for HealthEventResource +// CRD coordinates for HealthEventResource +var ( + HealthEventResourceGVK = schema.GroupVersionKind{ + Group: "healthevents.dgxc.nvidia.com", + Version: "v1", + Kind: "HealthEventResource", + } + + SchemeGroupVersion = schema.GroupVersion{ + Group: HealthEventResourceGVK.Group, + Version: HealthEventResourceGVK.Version, + } +) + +// HealthEventResourceCRD is the Kubernetes CRD type for HealthEventResource. // Spec and Status are generated from the proto definitions. type HealthEventResourceCRD struct { metav1.TypeMeta `json:",inline"` @@ -28,10 +49,141 @@ type HealthEventResourceCRD struct { Status *protos.HealthEventStatus `json:"status,omitempty"` } -// HealthEventResourceCRDList is the list type for HealthEventResourceCRD -// (optional, but useful for List operations) +// MarshalJSON uses protojson for Spec/Status so that protobuf well-known types +// (Timestamp → RFC3339 string, BoolValue → plain bool) match the CRD schema. +// Without this, encoding/json serializes Timestamp as {"seconds":...,"nanos":...} +// which the CRD rejects as "must be of type string". +func (in HealthEventResourceCRD) MarshalJSON() ([]byte, error) { + type helper struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + Spec json.RawMessage `json:"spec,omitempty"` + Status json.RawMessage `json:"status,omitempty"` + } + + h := helper{ + TypeMeta: in.TypeMeta, + ObjectMeta: in.ObjectMeta, + } + + opts := protojson.MarshalOptions{UseEnumNumbers: true} + + if in.Spec != nil { + b, err := opts.Marshal(in.Spec) + if err != nil { + return nil, fmt.Errorf("protojson marshal spec: %w", err) + } + h.Spec = b + } + + if in.Status != nil { + b, err := opts.Marshal(in.Status) + if err != nil { + return nil, fmt.Errorf("protojson marshal status: %w", err) + } + h.Status = b + } + + return json.Marshal(h) +} + +// UnmarshalJSON uses protojson for Spec/Status to correctly parse RFC3339 timestamps +// and other protobuf well-known types back into their Go protobuf representations. +func (in *HealthEventResourceCRD) UnmarshalJSON(data []byte) error { + type helper struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + Spec json.RawMessage `json:"spec,omitempty"` + Status json.RawMessage `json:"status,omitempty"` + } + + var h helper + if err := json.Unmarshal(data, &h); err != nil { + return err + } + + in.TypeMeta = h.TypeMeta + in.ObjectMeta = h.ObjectMeta + + opts := protojson.UnmarshalOptions{DiscardUnknown: true} + + if len(h.Spec) > 0 { + in.Spec = &protos.HealthEvent{} + if err := opts.Unmarshal(h.Spec, in.Spec); err != nil { + return fmt.Errorf("protojson unmarshal spec: %w", err) + } + } + + if len(h.Status) > 0 { + in.Status = &protos.HealthEventStatus{} + if err := opts.Unmarshal(h.Status, in.Status); err != nil { + return fmt.Errorf("protojson unmarshal status: %w", err) + } + } + + return nil +} + +func (in *HealthEventResourceCRD) DeepCopyObject() runtime.Object { + if in == nil { + return nil + } + + out := &HealthEventResourceCRD{} + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + + if in.Spec != nil { + out.Spec = proto.Clone(in.Spec).(*protos.HealthEvent) + } + + if in.Status != nil { + out.Status = proto.Clone(in.Status).(*protos.HealthEventStatus) + } + + return out +} + +// HealthEventResourceCRDList is the list type for HealthEventResourceCRD. type HealthEventResourceCRDList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []HealthEventResourceCRD `json:"items"` } + +func (in *HealthEventResourceCRDList) DeepCopyObject() runtime.Object { + if in == nil { + return nil + } + + out := &HealthEventResourceCRDList{} + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + + if in.Items != nil { + out.Items = make([]HealthEventResourceCRD, len(in.Items)) + for i := range in.Items { + cp := in.Items[i].DeepCopyObject().(*HealthEventResourceCRD) + out.Items[i] = *cp + } + } + + return out +} + +// AddToScheme registers the HealthEventResource types with the given scheme. +// We use AddKnownTypeWithName because the Go struct is named HealthEventResourceCRD +// but the CRD kind is HealthEventResource (without the CRD suffix). +func AddToScheme(scheme *runtime.Scheme) error { + scheme.AddKnownTypeWithName( + SchemeGroupVersion.WithKind("HealthEventResource"), + &HealthEventResourceCRD{}, + ) + scheme.AddKnownTypeWithName( + SchemeGroupVersion.WithKind("HealthEventResourceList"), + &HealthEventResourceCRDList{}, + ) + metav1.AddToGroupVersion(scheme, SchemeGroupVersion) + + return nil +} diff --git a/distros/kubernetes/nvsentinel/charts/event-exporter/templates/clusterrole.yaml b/distros/kubernetes/nvsentinel/charts/event-exporter/templates/clusterrole.yaml new file mode 100644 index 000000000..b330ea7a9 --- /dev/null +++ b/distros/kubernetes/nvsentinel/charts/event-exporter/templates/clusterrole.yaml @@ -0,0 +1,34 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "event-exporter.fullname" . }} + labels: + {{- include "event-exporter.labels" . | nindent 4 }} +rules: +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources + verbs: + - get + - list + - watch +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources/status + verbs: + - get diff --git a/distros/kubernetes/nvsentinel/charts/event-exporter/templates/clusterrolebinding.yaml b/distros/kubernetes/nvsentinel/charts/event-exporter/templates/clusterrolebinding.yaml new file mode 100644 index 000000000..39ab26755 --- /dev/null +++ b/distros/kubernetes/nvsentinel/charts/event-exporter/templates/clusterrolebinding.yaml @@ -0,0 +1,27 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ include "event-exporter.fullname" . }} + labels: + {{- include "event-exporter.labels" . | nindent 4 }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ include "event-exporter.fullname" . }} +subjects: + - kind: ServiceAccount + name: {{ include "event-exporter.fullname" . }} + namespace: {{ .Release.Namespace }} diff --git a/distros/kubernetes/nvsentinel/charts/event-exporter/templates/deployment.yaml b/distros/kubernetes/nvsentinel/charts/event-exporter/templates/deployment.yaml index ceaf92954..803929367 100644 --- a/distros/kubernetes/nvsentinel/charts/event-exporter/templates/deployment.yaml +++ b/distros/kubernetes/nvsentinel/charts/event-exporter/templates/deployment.yaml @@ -40,6 +40,7 @@ spec: imagePullSecrets: {{- toYaml . | nindent 8 }} {{- end }} + serviceAccountName: {{ include "event-exporter.fullname" . }} {{- if and .Values.global.datastore (eq .Values.global.datastore.provider "postgresql") }} initContainers: - name: fix-cert-permissions diff --git a/distros/kubernetes/nvsentinel/charts/event-exporter/templates/serviceaccount.yaml b/distros/kubernetes/nvsentinel/charts/event-exporter/templates/serviceaccount.yaml new file mode 100644 index 000000000..191a330c4 --- /dev/null +++ b/distros/kubernetes/nvsentinel/charts/event-exporter/templates/serviceaccount.yaml @@ -0,0 +1,19 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "event-exporter.fullname" . }} + labels: + {{- include "event-exporter.labels" . | nindent 4 }} diff --git a/distros/kubernetes/nvsentinel/charts/fault-quarantine/templates/clusterrole.yaml b/distros/kubernetes/nvsentinel/charts/fault-quarantine/templates/clusterrole.yaml index aa1a30f5a..2eb5d95d6 100644 --- a/distros/kubernetes/nvsentinel/charts/fault-quarantine/templates/clusterrole.yaml +++ b/distros/kubernetes/nvsentinel/charts/fault-quarantine/templates/clusterrole.yaml @@ -37,3 +37,21 @@ rules: - get - update - create +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources + verbs: + - get + - list + - watch + - update + - patch +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources/status + verbs: + - get + - update + - patch diff --git a/distros/kubernetes/nvsentinel/charts/fault-remediation/templates/clusterrole.yaml b/distros/kubernetes/nvsentinel/charts/fault-remediation/templates/clusterrole.yaml index bc3c8a536..a81360752 100644 --- a/distros/kubernetes/nvsentinel/charts/fault-remediation/templates/clusterrole.yaml +++ b/distros/kubernetes/nvsentinel/charts/fault-remediation/templates/clusterrole.yaml @@ -87,3 +87,21 @@ Collect all unique API groups and their associated resources from multi-template - get - list - watch +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources + verbs: + - get + - list + - watch + - update + - patch +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources/status + verbs: + - get + - update + - patch diff --git a/distros/kubernetes/nvsentinel/charts/health-events-analyzer/templates/clusterrole.yaml b/distros/kubernetes/nvsentinel/charts/health-events-analyzer/templates/clusterrole.yaml new file mode 100644 index 000000000..9b19dca9f --- /dev/null +++ b/distros/kubernetes/nvsentinel/charts/health-events-analyzer/templates/clusterrole.yaml @@ -0,0 +1,34 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "health-events-analyzer.fullname" . }} + labels: + {{- include "health-events-analyzer.labels" . | nindent 4 }} +rules: +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources + verbs: + - get + - list + - watch +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources/status + verbs: + - get diff --git a/distros/kubernetes/nvsentinel/charts/health-events-analyzer/templates/clusterrolebinding.yaml b/distros/kubernetes/nvsentinel/charts/health-events-analyzer/templates/clusterrolebinding.yaml new file mode 100644 index 000000000..7859ac906 --- /dev/null +++ b/distros/kubernetes/nvsentinel/charts/health-events-analyzer/templates/clusterrolebinding.yaml @@ -0,0 +1,27 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ include "health-events-analyzer.fullname" . }} + labels: + {{- include "health-events-analyzer.labels" . | nindent 4 }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ include "health-events-analyzer.fullname" . }} +subjects: + - kind: ServiceAccount + name: {{ include "health-events-analyzer.fullname" . }} + namespace: {{ .Release.Namespace }} diff --git a/distros/kubernetes/nvsentinel/charts/node-drainer/templates/clusterrole.yaml b/distros/kubernetes/nvsentinel/charts/node-drainer/templates/clusterrole.yaml index cc2d887b2..4da1c3f2d 100644 --- a/distros/kubernetes/nvsentinel/charts/node-drainer/templates/clusterrole.yaml +++ b/distros/kubernetes/nvsentinel/charts/node-drainer/templates/clusterrole.yaml @@ -57,6 +57,24 @@ rules: - create - update - watch +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources + verbs: + - get + - list + - watch + - update + - patch +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources/status + verbs: + - get + - update + - patch {{- if .Values.customDrain.enabled }} - apiGroups: - {{ .Values.customDrain.apiGroup | quote }} diff --git a/distros/kubernetes/nvsentinel/templates/clusterrole.yaml b/distros/kubernetes/nvsentinel/templates/clusterrole.yaml index a47651cc1..d25ae7fb0 100644 --- a/distros/kubernetes/nvsentinel/templates/clusterrole.yaml +++ b/distros/kubernetes/nvsentinel/templates/clusterrole.yaml @@ -56,3 +56,23 @@ rules: - get - list - watch +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources + verbs: + - get + - list + - watch + - create + - update + - patch + - delete +- apiGroups: + - healthevents.dgxc.nvidia.com + resources: + - healtheventresources/status + verbs: + - get + - update + - patch diff --git a/distros/kubernetes/nvsentinel/templates/configmap-datastore.yaml b/distros/kubernetes/nvsentinel/templates/configmap-datastore.yaml index 42d2dea7a..653ca5bc4 100644 --- a/distros/kubernetes/nvsentinel/templates/configmap-datastore.yaml +++ b/distros/kubernetes/nvsentinel/templates/configmap-datastore.yaml @@ -23,6 +23,7 @@ metadata: data: datastore.yaml: | provider: {{ .Values.global.datastore.provider | quote }} + {{- if .Values.global.datastore.connection }} connection: host: {{ .Values.global.datastore.connection.host | quote }} port: {{ .Values.global.datastore.connection.port | default 5432 }} @@ -46,13 +47,18 @@ data: extraParams: {{- toYaml .Values.global.datastore.connection.extraParams | nindent 8 }} {{- end }} + {{- end }} {{- if .Values.global.datastore.options }} options: {{- toYaml .Values.global.datastore.options | nindent 6 }} {{- end }} + {{- if .Values.global.datastore.namespace }} + namespace: {{ .Values.global.datastore.namespace | quote }} + {{- end }} # Environment variables for components DATASTORE_PROVIDER: {{ .Values.global.datastore.provider | quote }} + {{- if .Values.global.datastore.connection }} DATASTORE_HOST: {{ .Values.global.datastore.connection.host | quote }} DATASTORE_PORT: {{ .Values.global.datastore.connection.port | default 5432 | quote }} DATASTORE_DATABASE: {{ .Values.global.datastore.connection.database | quote }} @@ -71,16 +77,22 @@ data: {{- if .Values.global.datastore.connection.sslrootcert }} DATASTORE_SSLROOTCERT: {{ .Values.global.datastore.connection.sslrootcert | quote }} {{- end }} + {{- end }} + {{- if .Values.global.datastore.namespace }} + DATASTORE_NAMESPACE: {{ .Values.global.datastore.namespace | quote }} + {{- end }} # Certificate rotation configuration (MongoDB only) # When enabled, client certificates can be rotated without restarting pods - {{- if eq .Values.global.datastore.provider "mongodb" }} + {{- if and .Values.global.datastore.connection (eq .Values.global.datastore.provider "mongodb") }} MONGODB_ENABLE_CERT_ROTATION: {{ .Values.global.certificateRotationEnabled | default false | quote }} {{- end }} # Legacy MongoDB environment variables for backward compatibility - # These are set for all providers to support legacy code paths that still use the old config system - {{- if eq .Values.global.datastore.provider "mongodb" }} + # These must be non-empty even for non-MongoDB providers because several components + # call TokenConfigFromEnv() at startup which requires MONGODB_DATABASE_NAME and + # MONGODB_TOKEN_COLLECTION_NAME to be set (used for change stream resume tokens). + {{- if and .Values.global.datastore.connection (eq .Values.global.datastore.provider "mongodb") }} {{- $mongodbURI := printf "mongodb://%s:%d/" .Values.global.datastore.connection.host (int (.Values.global.datastore.connection.port | default 27017)) }} {{- if .Values.global.datastore.connection.extraParams }} {{- $params := list }} @@ -91,16 +103,19 @@ data: {{- end }} MONGODB_URI: {{ $mongodbURI | quote }} {{- else }} - # For non-MongoDB datastores, set empty MongoDB URI - MONGODB_URI: "" + MONGODB_URI: "unused" {{- end }} + {{- if and .Values.global.datastore.connection .Values.global.datastore.connection.database }} MONGODB_DATABASE_NAME: {{ .Values.global.datastore.connection.database | quote }} - {{- if .Values.global.datastore.connection.collection }} + {{- else }} + MONGODB_DATABASE_NAME: "nvsentinel" + {{- end }} + {{- if and .Values.global.datastore.connection .Values.global.datastore.connection.collection }} MONGODB_COLLECTION_NAME: {{ .Values.global.datastore.connection.collection | quote }} {{- else }} MONGODB_COLLECTION_NAME: "health_events" {{- end }} - {{- if .Values.global.datastore.connection.tokenCollection }} + {{- if and .Values.global.datastore.connection .Values.global.datastore.connection.tokenCollection }} MONGODB_TOKEN_COLLECTION_NAME: {{ .Values.global.datastore.connection.tokenCollection | quote }} {{- else }} MONGODB_TOKEN_COLLECTION_NAME: "ResumeTokens" diff --git a/distros/kubernetes/nvsentinel/templates/configmap.yaml b/distros/kubernetes/nvsentinel/templates/configmap.yaml index b2003fa55..84e6c24e3 100644 --- a/distros/kubernetes/nvsentinel/templates/configmap.yaml +++ b/distros/kubernetes/nvsentinel/templates/configmap.yaml @@ -27,7 +27,8 @@ data: "MaxNodeConditionMessageLength": {{ .Values.platformConnector.k8sConnector.maxNodeConditionMessageLength }}, "StoreConnectorMaxRetries": {{ .Values.platformConnector.mongodbStore.maxRetries }}, "enableMongoDBStorePlatformConnector": "{{ .Values.global.mongodbStore.enabled }}", - "enablePostgresDBStorePlatformConnector": {{ if and .Values.global.datastore .Values.global.datastore.provider }}{{ eq .Values.global.datastore.provider "postgresql" | quote }}{{ else }}"false"{{ end }} + "enablePostgresDBStorePlatformConnector": {{ if and .Values.global.datastore .Values.global.datastore.provider }}{{ eq .Values.global.datastore.provider "postgresql" | quote }}{{ else }}"false"{{ end }}, + "enableK8sStorePlatformConnector": {{ if and .Values.global.datastore .Values.global.datastore.provider }}{{ eq .Values.global.datastore.provider "kubernetes" | quote }}{{ else }}"false"{{ end }} {{- with .Values.platformConnector.pipeline }} ,"pipeline": {{ . | toJson }} {{- end }} diff --git a/distros/kubernetes/nvsentinel/values-tilt-kubernetes.yaml b/distros/kubernetes/nvsentinel/values-tilt-kubernetes.yaml new file mode 100644 index 000000000..82e3afb5f --- /dev/null +++ b/distros/kubernetes/nvsentinel/values-tilt-kubernetes.yaml @@ -0,0 +1,52 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Kubernetes CRD-based datastore values for Tilt development +# This file is included when USE_KUBERNETES=1 is set +# Example: USE_KUBERNETES=1 tilt up -f tilt/Tiltfile + +global: + dryRun: false + kubeVersion: 1.31.0 + clusterType: standalone + + # Keep legacy resource naming for Tilt compatibility + useLegacyResourceNames: true + + # Configure Kubernetes CRDs as the datastore provider + datastore: + provider: "kubernetes" + namespace: "nvsentinel" + + # Disable MongoDB store when using Kubernetes CRDs + mongodbStore: + enabled: false + + # csp-health-monitor uses MaintenanceEvents with a legacy MongoDB-only code path; + # disable it when using the Kubernetes CRD datastore for HealthEvents + cspHealthMonitor: + enabled: false + + # Enable the CRD subchart so HealthEventResource CRD is installed + k8sdatastoreCrds: + enabled: true + +# Disable PostgreSQL subchart +postgresql: + enabled: false + +# Disable MongoDB-specific configuration +mongodb-store: + mongodb: + enabled: false diff --git a/event-exporter/go.mod b/event-exporter/go.mod index be0e66ee6..4a015dd5f 100644 --- a/event-exporter/go.mod +++ b/event-exporter/go.mod @@ -19,6 +19,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.13.0 // indirect + github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect @@ -50,6 +51,7 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.4 // indirect github.com/prometheus/procfs v0.19.2 // indirect + github.com/spf13/pflag v1.0.10 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.2.0 // indirect diff --git a/health-events-analyzer/go.mod b/health-events-analyzer/go.mod index 63de6d279..8cae80a55 100644 --- a/health-events-analyzer/go.mod +++ b/health-events-analyzer/go.mod @@ -30,6 +30,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.13.0 // indirect + github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect @@ -62,6 +63,7 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.4 // indirect github.com/prometheus/procfs v0.19.2 // indirect + github.com/spf13/pflag v1.0.10 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect diff --git a/platform-connectors/main.go b/platform-connectors/main.go index 5c4308044..2b8d27f93 100755 --- a/platform-connectors/main.go +++ b/platform-connectors/main.go @@ -264,7 +264,9 @@ func initializeConnectors( } // Keep the legacy config key name for backward compatibility with existing ConfigMaps - if config["enableMongoDBStorePlatformConnector"] == True || config["enablePostgresDBStorePlatformConnector"] == True { + if config["enableMongoDBStorePlatformConnector"] == True || + config["enablePostgresDBStorePlatformConnector"] == True || + config["enableK8sStorePlatformConnector"] == True { storeConnector, err = initializeDatabaseStoreConnector(ctx, config, databaseClientCertMountPath) if err != nil { return nil, nil, fmt.Errorf("failed to initialize database store connector: %w", err) diff --git a/store-client/go.mod b/store-client/go.mod index b69e13fff..9cbfad32a 100644 --- a/store-client/go.mod +++ b/store-client/go.mod @@ -16,6 +16,8 @@ require ( go.mongodb.org/mongo-driver v1.17.9 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 + k8s.io/apimachinery v0.35.2 + k8s.io/client-go v0.35.2 sigs.k8s.io/controller-runtime v0.22.4 ) @@ -24,6 +26,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.13.0 // indirect + github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-openapi/jsonpointer v0.22.3 // indirect @@ -54,6 +57,7 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.4 // indirect github.com/prometheus/procfs v0.19.2 // indirect + github.com/spf13/pflag v1.0.10 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect @@ -79,8 +83,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect k8s.io/api v0.35.2 // indirect k8s.io/apiextensions-apiserver v0.34.3 // indirect - k8s.io/apimachinery v0.35.2 // indirect - k8s.io/client-go v0.35.2 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20251125145642-4e65d59e963e // indirect k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect diff --git a/store-client/go.sum b/store-client/go.sum index 79ab16bc2..b1f1a12b9 100644 --- a/store-client/go.sum +++ b/store-client/go.sum @@ -14,7 +14,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emicklei/go-restful/v3 v3.13.0 h1:C4Bl2xDndpU6nJ4bc1jXd+uTmYPVUwkD6bFY/oTyCes= github.com/emicklei/go-restful/v3 v3.13.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= diff --git a/store-client/pkg/datastore/config.go b/store-client/pkg/datastore/config.go index f11aeee93..e5683eeea 100644 --- a/store-client/pkg/datastore/config.go +++ b/store-client/pkg/datastore/config.go @@ -179,6 +179,10 @@ func loadOptionsFromEnv(config *DataStoreConfig) { if maxIdle := os.Getenv("DATASTORE_MAX_IDLE_CONNECTIONS"); maxIdle != "" { config.Options["maxIdleConnections"] = maxIdle } + + if namespace := os.Getenv("DATASTORE_NAMESPACE"); namespace != "" { + config.Options["namespace"] = namespace + } } // loadConfigFromYAMLString loads configuration from YAML string diff --git a/store-client/pkg/datastore/providers/kubernetes/constants.go b/store-client/pkg/datastore/providers/kubernetes/constants.go new file mode 100644 index 000000000..e96556c3f --- /dev/null +++ b/store-client/pkg/datastore/providers/kubernetes/constants.go @@ -0,0 +1,23 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +const ( + labelNodeName = "healthevents.dgxc.nvidia.com/node-name" + labelCheckName = "healthevents.dgxc.nvidia.com/check-name" + labelAgent = "healthevents.dgxc.nvidia.com/agent" + + originalHealthEventTimestamp = "healthevents.dgxc.nvidia.com/original-health-event-timestamp" +) diff --git a/store-client/pkg/datastore/providers/kubernetes/database_client.go b/store-client/pkg/datastore/providers/kubernetes/database_client.go new file mode 100644 index 000000000..efdcdd2b9 --- /dev/null +++ b/store-client/pkg/datastore/providers/kubernetes/database_client.go @@ -0,0 +1,423 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/nvidia/nvsentinel/data-models/pkg/model" + "github.com/nvidia/nvsentinel/data-models/pkg/protos" + "github.com/nvidia/nvsentinel/store-client/pkg/client" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + crclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +// KubernetesDatabaseClient implements client.DatabaseClient using Kubernetes CRs. +// This provides backward compatibility with consumers that still use the legacy +// DatabaseClient interface (platform-connector InsertMany, fault-quarantine queries, etc.). +type KubernetesDatabaseClient struct { + client crclient.WithWatch + namespace string +} + +// NewKubernetesDatabaseClient creates a new Kubernetes database client. +func NewKubernetesDatabaseClient(k8sClient crclient.WithWatch, namespace string) *KubernetesDatabaseClient { + slog.Info("Creating Kubernetes database client", "namespace", namespace) + + return &KubernetesDatabaseClient{ + client: k8sClient, + namespace: namespace, + } +} + +func (k *KubernetesDatabaseClient) InsertMany( + ctx context.Context, documents []interface{}, +) (*client.InsertManyResult, error) { + slog.Info("InsertMany called", "documentCount", len(documents), "namespace", k.namespace) + + insertedIDs := make([]interface{}, 0, len(documents)) + + for i, doc := range documents { + cr, err := healthEventToCR(doc, k.namespace) + if err != nil { + slog.Error("Failed to convert document to CR", "index", i, "error", err) + return nil, fmt.Errorf("failed to convert document %d: %w", i, err) + } + + if err := k.client.Create(ctx, cr); err != nil { + slog.Error("Failed to create HealthEventResource CR", "index", i, "generateName", cr.GenerateName, "error", err) + return nil, fmt.Errorf("failed to create HealthEventResource %d: %w", i, err) + } + + insertedIDs = append(insertedIDs, cr.Name) + slog.Debug("Created HealthEventResource CR", "name", cr.Name, "node", cr.Labels[labelNodeName]) + } + + slog.Info("InsertMany completed", "insertedCount", len(insertedIDs)) + + return &client.InsertManyResult{InsertedIDs: insertedIDs}, nil +} + +// healthEventToCR converts a model.HealthEventWithStatus into a typed HealthEventResourceCRD. +func healthEventToCR(doc interface{}, namespace string) (*model.HealthEventResourceCRD, error) { + hews, ok := doc.(model.HealthEventWithStatus) + if !ok { + return nil, fmt.Errorf("expected model.HealthEventWithStatus, got %T", doc) + } + + labels := map[string]string{} + if hews.HealthEvent != nil { + if hews.HealthEvent.NodeName != "" { + labels[labelNodeName] = hews.HealthEvent.NodeName + } + + if hews.HealthEvent.CheckName != "" { + labels[labelCheckName] = hews.HealthEvent.CheckName + } + + if hews.HealthEvent.Agent != "" { + labels[labelAgent] = hews.HealthEvent.Agent + } + } + + namePrefix := generateCRNamePrefix(hews.HealthEvent) + + slog.Debug("Converting health event to CR", + "namePrefix", namePrefix, "namespace", namespace, + "node", labels[labelNodeName], "check", labels[labelCheckName], "agent", labels[labelAgent]) + + cr := &model.HealthEventResourceCRD{ + TypeMeta: metav1.TypeMeta{ + APIVersion: model.SchemeGroupVersion.Group + "/" + model.SchemeGroupVersion.Version, + Kind: "HealthEventResource", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: namePrefix, + Namespace: namespace, + Labels: labels, + Annotations: map[string]string{ + originalHealthEventTimestamp: hews.CreatedAt.Format("2006-01-02T15:04:05Z"), + }, + }, + Spec: hews.HealthEvent, + Status: hews.HealthEventStatus, + } + + return cr, nil +} + +// generateCRNamePrefix builds a human-readable prefix for GenerateName. +// The K8s API server appends a random suffix to guarantee uniqueness. +func generateCRNamePrefix(he *protos.HealthEvent) string { + if he == nil { + return "he-" + } + + nodeName := strings.ReplaceAll(he.NodeName, ".", "-") + if len(nodeName) > 40 { + nodeName = nodeName[:40] + } + + return fmt.Sprintf("he-%s-", strings.ToLower(nodeName)) +} + +func (k *KubernetesDatabaseClient) UpdateDocumentStatus( + ctx context.Context, documentID string, statusPath string, status interface{}, +) error { + slog.Warn("UpdateDocumentStatus called but not implemented", "documentID", documentID, "statusPath", statusPath) + return fmt.Errorf("kubernetes: UpdateDocumentStatus not yet implemented") +} + +func (k *KubernetesDatabaseClient) UpdateDocumentStatusFields( + ctx context.Context, documentID string, fields map[string]interface{}, +) error { + slog.Debug("UpdateDocumentStatusFields called", "documentID", documentID, "fields", fields) + + cr := &model.HealthEventResourceCRD{} + key := crclient.ObjectKey{Name: documentID, Namespace: k.namespace} + + if err := k.client.Get(ctx, key, cr); err != nil { + slog.Error("Failed to get CR for status update", "documentID", documentID, "error", err) + return fmt.Errorf("failed to get HealthEventResource %s: %w", documentID, err) + } + + applySetFieldsToStatus(cr, fields) + + if err := k.client.Status().Update(ctx, cr); err != nil { + slog.Error("Failed to update CR status subresource", "documentID", documentID, "error", err) + return fmt.Errorf("failed to update status subresource for %s: %w", documentID, err) + } + + slog.Debug("UpdateDocumentStatusFields completed", "documentID", documentID) + + return nil +} + +func (k *KubernetesDatabaseClient) UpdateDocument( + ctx context.Context, filter interface{}, update interface{}, +) (*client.UpdateResult, error) { + slog.Warn("UpdateDocument called but not implemented") + return nil, fmt.Errorf("kubernetes: UpdateDocument not yet implemented") +} + +func (k *KubernetesDatabaseClient) UpdateManyDocuments( + ctx context.Context, filter interface{}, update interface{}, +) (*client.UpdateResult, error) { + slog.Warn("UpdateManyDocuments called but not implemented") + return nil, fmt.Errorf("kubernetes: UpdateManyDocuments not yet implemented") +} + +func (k *KubernetesDatabaseClient) UpsertDocument( + ctx context.Context, filter interface{}, document interface{}, +) (*client.UpdateResult, error) { + slog.Warn("UpsertDocument called but not implemented") + return nil, fmt.Errorf("kubernetes: UpsertDocument not yet implemented") +} + +func (k *KubernetesDatabaseClient) FindOne( + ctx context.Context, filter interface{}, options *client.FindOneOptions, +) (client.SingleResult, error) { + slog.Debug("FindOne called", "filter", filter) + + list := &model.HealthEventResourceCRDList{} + if err := k.client.List(ctx, list, crclient.InNamespace(k.namespace)); err != nil { + return nil, fmt.Errorf("failed to list HealthEventResources: %w", err) + } + + if len(list.Items) == 0 { + return &k8sEmptySingleResult{}, nil + } + + cr := &list.Items[0] + + return &k8sSingleResult{cr: cr}, nil +} + +func (k *KubernetesDatabaseClient) Find( + ctx context.Context, filter interface{}, options *client.FindOptions, +) (client.Cursor, error) { + slog.Debug("Find called", "filter", filter) + + list := &model.HealthEventResourceCRDList{} + if err := k.client.List(ctx, list, crclient.InNamespace(k.namespace)); err != nil { + return nil, fmt.Errorf("failed to list HealthEventResources: %w", err) + } + + items := filterCRsByTime(list.Items, filter) + + if options != nil && options.Limit != nil && *options.Limit > 0 && int64(len(items)) > *options.Limit { + items = items[:*options.Limit] + } + + slog.Debug("Find listed CRs", "total", len(list.Items), "afterFilter", len(items)) + + return newK8sCursor(items), nil +} + +func (k *KubernetesDatabaseClient) CountDocuments( + ctx context.Context, filter interface{}, _ *client.CountOptions, +) (int64, error) { + slog.Debug("CountDocuments called", "namespace", k.namespace, "filter", filter) + list := &model.HealthEventResourceCRDList{} + if err := k.client.List(ctx, list, crclient.InNamespace(k.namespace)); err != nil { + return 0, fmt.Errorf("failed to list HealthEventResources: %w", err) + } + items := filterCRsByTime(list.Items, filter) + return int64(len(items)), nil +} + +func (k *KubernetesDatabaseClient) Aggregate( + _ context.Context, pipeline interface{}, +) (client.Cursor, error) { + slog.Debug("Aggregate called on Kubernetes database client", "namespace", k.namespace) + return &k8sEmptyCursor{}, nil +} + +// k8sEmptyCursor implements client.Cursor returning zero results. +// Used by Aggregate until full pipeline evaluation is implemented. +type k8sEmptyCursor struct{} + +func (c *k8sEmptyCursor) Next(_ context.Context) bool { return false } +func (c *k8sEmptyCursor) Decode(_ interface{}) error { return fmt.Errorf("cursor exhausted") } +func (c *k8sEmptyCursor) Close(_ context.Context) error { return nil } +func (c *k8sEmptyCursor) All(_ context.Context, _ interface{}) error { return nil } +func (c *k8sEmptyCursor) Err() error { return nil } + +func (k *KubernetesDatabaseClient) Ping(_ context.Context) error { + return nil +} + +func (k *KubernetesDatabaseClient) NewChangeStreamWatcher( + ctx context.Context, tokenConfig client.TokenConfig, pipeline interface{}, +) (client.ChangeStreamWatcher, error) { + slog.Info("Creating Kubernetes change stream watcher via DatabaseClient", + "clientName", tokenConfig.ClientName, "namespace", k.namespace) + + w := NewKubernetesChangeStreamWatcher(k.client, k.namespace, tokenConfig.ClientName) + + return w.Unwrap(), nil +} + +func (k *KubernetesDatabaseClient) DeleteResumeToken( + ctx context.Context, tokenConfig client.TokenConfig, +) error { + slog.Warn("DeleteResumeToken called but not implemented") + return fmt.Errorf("kubernetes: DeleteResumeToken not yet implemented") +} + +func (k *KubernetesDatabaseClient) Close(_ context.Context) error { + slog.Debug("Kubernetes database client closed") + return nil +} + +// filterCRsByTime applies createdAt time range filtering from a MongoDB-style filter map. +func filterCRsByTime(items []model.HealthEventResourceCRD, filter interface{}) []model.HealthEventResourceCRD { + filterMap, ok := filter.(map[string]any) + if !ok { + return items + } + + createdAtRaw, ok := filterMap["createdAt"] + if !ok { + return items + } + + rangeMap, ok := createdAtRaw.(map[string]any) + if !ok { + return items + } + + var gte, lt time.Time + if v, ok := rangeMap["$gte"]; ok { + if t, ok := v.(time.Time); ok { + gte = t + } + } + if v, ok := rangeMap["$lt"]; ok { + if t, ok := v.(time.Time); ok { + lt = t + } + } + + var filtered []model.HealthEventResourceCRD + for i := range items { + ts := items[i].CreationTimestamp.Time + if ann := items[i].GetAnnotations(); ann != nil { + if raw, ok := ann[originalHealthEventTimestamp]; ok { + if parsed, err := parseTime(raw); err == nil { + ts = parsed + } + } + } + if !gte.IsZero() && ts.Before(gte) { + continue + } + if !lt.IsZero() && !ts.Before(lt) { + continue + } + filtered = append(filtered, items[i]) + } + return filtered +} + +// k8sCursor implements client.Cursor backed by an in-memory slice of CRs. +type k8sCursor struct { + items []model.HealthEventResourceCRD + idx int + err error +} + +func newK8sCursor(items []model.HealthEventResourceCRD) *k8sCursor { + return &k8sCursor{items: items, idx: -1} +} + +func (c *k8sCursor) Next(_ context.Context) bool { + c.idx++ + return c.idx < len(c.items) +} + +func (c *k8sCursor) Decode(v interface{}) error { + if c.idx < 0 || c.idx >= len(c.items) { + return fmt.Errorf("cursor out of bounds") + } + return decodeCRInto(&c.items[c.idx], v) +} + +func (c *k8sCursor) Close(_ context.Context) error { return nil } + +func (c *k8sCursor) All(_ context.Context, results interface{}) error { + slice, ok := results.(*[]model.HealthEventWithStatus) + if !ok { + return fmt.Errorf("All: expected *[]model.HealthEventWithStatus, got %T", results) + } + for i := range c.items { + var hews model.HealthEventWithStatus + if err := decodeCRInto(&c.items[i], &hews); err != nil { + return err + } + *slice = append(*slice, hews) + } + return nil +} + +func (c *k8sCursor) Err() error { return c.err } + +// k8sSingleResult implements client.SingleResult for a found CR. +type k8sSingleResult struct { + cr *model.HealthEventResourceCRD +} + +func (r *k8sSingleResult) Decode(v interface{}) error { + return decodeCRInto(r.cr, v) +} + +func (r *k8sSingleResult) Err() error { return nil } + +// k8sEmptySingleResult implements client.SingleResult for a not-found case. +type k8sEmptySingleResult struct{} + +func (r *k8sEmptySingleResult) Decode(_ interface{}) error { + return fmt.Errorf("no documents in result") +} + +func (r *k8sEmptySingleResult) Err() error { return nil } + +// decodeCRInto populates the target from a HealthEventResourceCRD. +func decodeCRInto(cr *model.HealthEventResourceCRD, v interface{}) error { + switch target := v.(type) { + case *model.HealthEventWithStatus: + createdAt := cr.CreationTimestamp.Time + if ann := cr.GetAnnotations(); ann != nil { + if ts, ok := ann[originalHealthEventTimestamp]; ok { + if t, err := parseTime(ts); err == nil { + createdAt = t + } + } + } + target.CreatedAt = createdAt + target.HealthEvent = cr.Spec + target.HealthEventStatus = cr.Status + return nil + default: + return fmt.Errorf("decodeCRInto: unsupported target type %T", v) + } +} + +var _ client.DatabaseClient = (*KubernetesDatabaseClient)(nil) diff --git a/store-client/pkg/datastore/providers/kubernetes/datastore.go b/store-client/pkg/datastore/providers/kubernetes/datastore.go new file mode 100644 index 000000000..7e12c2e1d --- /dev/null +++ b/store-client/pkg/datastore/providers/kubernetes/datastore.go @@ -0,0 +1,189 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "fmt" + "log/slog" + "os" + + "github.com/nvidia/nvsentinel/data-models/pkg/model" + "github.com/nvidia/nvsentinel/store-client/pkg/client" + "github.com/nvidia/nvsentinel/store-client/pkg/datastore" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + crclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +// KubernetesDataStore implements the DataStore interface using Kubernetes CRs as the backing store. +// HealthEvent data is stored as HealthEventResource custom resources in the cluster's etcd +// via the Kubernetes API server, eliminating external database dependencies. +type KubernetesDataStore struct { + k8sClient crclient.WithWatch + namespace string + healthEventStore datastore.HealthEventStore + databaseClient *KubernetesDatabaseClient +} + +// NewKubernetesStore creates a new Kubernetes datastore backed by controller-runtime's typed client. +func NewKubernetesStore(ctx context.Context, config datastore.DataStoreConfig) (datastore.DataStore, error) { + namespace := config.Options["namespace"] + if namespace == "" { + namespace = "nvsentinel" + } + + restConfig, err := buildRESTConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to build Kubernetes REST config: %w", err) + } + + scheme := runtime.NewScheme() + if err := model.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to register HealthEventResource types: %w", err) + } + + k8sClient, err := crclient.NewWithWatch(restConfig, crclient.Options{Scheme: scheme}) + if err != nil { + return nil, fmt.Errorf("failed to create controller-runtime client: %w", err) + } + + store := &KubernetesDataStore{ + k8sClient: k8sClient, + namespace: namespace, + } + + store.healthEventStore = NewKubernetesHealthEventStore(k8sClient, namespace) + store.databaseClient = NewKubernetesDatabaseClient(k8sClient, namespace) + + slog.Info("Successfully created Kubernetes datastore", "namespace", namespace) + + return store, nil +} + +// MaintenanceEventStore returns the maintenance event store. +// Not yet implemented for the Kubernetes provider. +func (k *KubernetesDataStore) MaintenanceEventStore() datastore.MaintenanceEventStore { + return nil +} + +// HealthEventStore returns the health event store. +func (k *KubernetesDataStore) HealthEventStore() datastore.HealthEventStore { + return k.healthEventStore +} + +// Ping tests the Kubernetes API server connection by performing a lightweight List. +func (k *KubernetesDataStore) Ping(ctx context.Context) error { + slog.Debug("Pinging Kubernetes API server", "namespace", k.namespace) + + list := &model.HealthEventResourceCRDList{} + if err := k.k8sClient.List(ctx, list, crclient.InNamespace(k.namespace), crclient.Limit(1)); err != nil { + slog.Error("Kubernetes API server ping failed", "error", err) + return datastore.NewConnectionError(datastore.ProviderKubernetes, "failed to ping Kubernetes API server", err) + } + + slog.Debug("Kubernetes API server ping succeeded") + + return nil +} + +// Close is a no-op for Kubernetes (no persistent connections to close). +func (k *KubernetesDataStore) Close(_ context.Context) error { + return nil +} + +// Provider returns the provider type. +func (k *KubernetesDataStore) Provider() datastore.DataStoreProvider { + return datastore.ProviderKubernetes +} + +// GetDatabaseClient returns the Kubernetes implementation of client.DatabaseClient. +// Required for backward compatibility with consumers that type-assert for this method +// (fault-quarantine, node-drainer, fault-remediation, health-events-analyzer). +func (k *KubernetesDataStore) GetDatabaseClient() client.DatabaseClient { + return k.databaseClient +} + +// CreateChangeStreamWatcher creates a Kubernetes watch-based watcher that implements +// the same change-stream semantics as MongoDB/PostgreSQL. +func (k *KubernetesDataStore) CreateChangeStreamWatcher( + ctx context.Context, clientName string, pipeline interface{}, +) (datastore.ChangeStreamWatcher, error) { + slog.Info("Creating change stream watcher", "clientName", clientName, "namespace", k.namespace) + + watcher := NewKubernetesChangeStreamWatcher(k.k8sClient, k.namespace, clientName) + + return watcher, nil +} + +// NewChangeStreamWatcher creates a watcher from a generic config map. +func (k *KubernetesDataStore) NewChangeStreamWatcher( + ctx context.Context, config interface{}, +) (datastore.ChangeStreamWatcher, error) { + configMap, ok := config.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("unsupported config type: %T", config) + } + + var clientName string + if val, ok := configMap["ClientName"].(string); ok { + clientName = val + } + + if clientName == "" { + return nil, fmt.Errorf("ClientName is required") + } + + var pipeline interface{} + if val, ok := configMap["Pipeline"]; ok { + pipeline = val + } + + return k.CreateChangeStreamWatcher(ctx, clientName, pipeline) +} + +// buildRESTConfig builds a Kubernetes REST config. +// Priority: in-cluster config > ~/.kube/config (for local dev) > explicit DATASTORE_HOST. +func buildRESTConfig(config datastore.DataStoreConfig) (*rest.Config, error) { + restConfig, err := rest.InClusterConfig() + if err == nil { + slog.Info("Using in-cluster Kubernetes config") + return restConfig, nil + } + + slog.Info("Not running in-cluster, falling back to kubeconfig", "inClusterErr", err) + + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + configOverrides := &clientcmd.ConfigOverrides{} + kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) + + restConfig, err = kubeConfig.ClientConfig() + if err == nil { + slog.Info("Using kubeconfig") + return restConfig, nil + } + + explicitHost := os.Getenv("DATASTORE_HOST") + if explicitHost != "" { + slog.Info("Using explicit Kubernetes API server host from DATASTORE_HOST", "host", explicitHost) + return &rest.Config{Host: explicitHost}, nil + } + + return nil, fmt.Errorf("failed to build Kubernetes REST config: no in-cluster config, kubeconfig, or DATASTORE_HOST available") +} + +var _ datastore.DataStore = (*KubernetesDataStore)(nil) diff --git a/store-client/pkg/datastore/providers/kubernetes/health_store.go b/store-client/pkg/datastore/providers/kubernetes/health_store.go new file mode 100644 index 000000000..81a3b1ae6 --- /dev/null +++ b/store-client/pkg/datastore/providers/kubernetes/health_store.go @@ -0,0 +1,662 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "fmt" + "log/slog" + "sort" + + "github.com/nvidia/nvsentinel/data-models/pkg/model" + "github.com/nvidia/nvsentinel/data-models/pkg/protos" + "github.com/nvidia/nvsentinel/store-client/pkg/datastore" + + "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/wrapperspb" + crclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +// KubernetesHealthEventStore implements HealthEventStore using Kubernetes CRs. +// Each health event is stored as a HealthEventResource custom resource where: +// - spec contains the immutable HealthEvent data +// - status contains the mutable HealthEventStatus (quarantine, drain, remediation state) +type KubernetesHealthEventStore struct { + client crclient.Client + namespace string +} + +// NewKubernetesHealthEventStore creates a new Kubernetes health event store. +func NewKubernetesHealthEventStore(client crclient.Client, namespace string) datastore.HealthEventStore { + slog.Info("Creating Kubernetes health event store", "namespace", namespace) + + return &KubernetesHealthEventStore{ + client: client, + namespace: namespace, + } +} + +// UpdateHealthEventStatus replaces the full status of a HealthEventResource CR. +func (k *KubernetesHealthEventStore) UpdateHealthEventStatus( + ctx context.Context, id string, status datastore.HealthEventStatus, +) error { + slog.Debug("UpdateHealthEventStatus called", "id", id, "namespace", k.namespace) + + cr := &model.HealthEventResourceCRD{} + key := crclient.ObjectKey{Name: id, Namespace: k.namespace} + + if err := k.client.Get(ctx, key, cr); err != nil { + return fmt.Errorf("failed to get HealthEventResource %s: %w", id, err) + } + + cr.Status = datastoreStatusToProto(status) + + if err := k.client.Status().Update(ctx, cr); err != nil { + return fmt.Errorf("failed to update status for %s: %w", id, err) + } + + slog.Debug("UpdateHealthEventStatus completed", "id", id) + + return nil +} + +// UpdateHealthEventStatusByNode updates the status of all CRs matching a given node name. +func (k *KubernetesHealthEventStore) UpdateHealthEventStatusByNode( + ctx context.Context, nodeName string, status datastore.HealthEventStatus, +) error { + slog.Debug("UpdateHealthEventStatusByNode called", "node", nodeName, "namespace", k.namespace) + + list := &model.HealthEventResourceCRDList{} + if err := k.client.List(ctx, list, + crclient.InNamespace(k.namespace), + crclient.MatchingLabels{labelNodeName: nodeName}, + ); err != nil { + return fmt.Errorf("failed to list CRs for node %s: %w", nodeName, err) + } + + slog.Debug("Found CRs for node status update", "node", nodeName, "count", len(list.Items)) + + protoStatus := datastoreStatusToProto(status) + + for i := range list.Items { + cr := &list.Items[i] + cr.Status = protoStatus + + if err := k.client.Status().Update(ctx, cr); err != nil { + return fmt.Errorf("failed to update status for CR %s: %w", cr.Name, err) + } + + slog.Debug("Updated CR status", "crName", cr.Name, "node", nodeName) + } + + slog.Debug("UpdateHealthEventStatusByNode completed", "node", nodeName, "updatedCount", len(list.Items)) + + return nil +} + +// FindHealthEventsByNode lists all HealthEventResource CRs for a given node. +func (k *KubernetesHealthEventStore) FindHealthEventsByNode( + ctx context.Context, nodeName string, +) ([]datastore.HealthEventWithStatus, error) { + slog.Debug("FindHealthEventsByNode called", "node", nodeName, "namespace", k.namespace) + + list := &model.HealthEventResourceCRDList{} + if err := k.client.List(ctx, list, + crclient.InNamespace(k.namespace), + crclient.MatchingLabels{labelNodeName: nodeName}, + ); err != nil { + return nil, datastore.NewQueryError(datastore.ProviderKubernetes, "failed to list CRs by node", err). + WithMetadata("nodeName", nodeName) + } + + events := crListToHealthEvents(list) + slog.Debug("FindHealthEventsByNode completed", "node", nodeName, "resultCount", len(events)) + + return events, nil +} + +// FindHealthEventsByFilter lists CRs matching a generic filter map. +// Recognized keys are mapped to label selectors; unrecognized keys are filtered in-memory. +func (k *KubernetesHealthEventStore) FindHealthEventsByFilter( + ctx context.Context, filter map[string]interface{}, +) ([]datastore.HealthEventWithStatus, error) { + slog.Debug("FindHealthEventsByFilter called", "filter", filter, "namespace", k.namespace) + + labels := buildMatchingLabels(filter) + + list := &model.HealthEventResourceCRDList{} + opts := []crclient.ListOption{crclient.InNamespace(k.namespace)} + + if len(labels) > 0 { + opts = append(opts, crclient.MatchingLabels(labels)) + slog.Debug("Using label selectors", "labels", labels) + } + + if err := k.client.List(ctx, list, opts...); err != nil { + return nil, datastore.NewQueryError(datastore.ProviderKubernetes, "failed to list CRs by filter", err). + WithMetadata("filter", filter) + } + + events := crListToHealthEvents(list) + filtered := filterInMemory(events, filter) + + slog.Debug("FindHealthEventsByFilter completed", + "totalCRs", len(list.Items), "afterLabelFilter", len(events), "afterInMemoryFilter", len(filtered)) + + return filtered, nil +} + +// FindHealthEventsByStatus finds CRs whose quarantine or eviction status matches. +func (k *KubernetesHealthEventStore) FindHealthEventsByStatus( + ctx context.Context, status datastore.Status, +) ([]datastore.HealthEventWithStatus, error) { + slog.Debug("FindHealthEventsByStatus called", "status", status, "namespace", k.namespace) + + list := &model.HealthEventResourceCRDList{} + if err := k.client.List(ctx, list, crclient.InNamespace(k.namespace)); err != nil { + return nil, datastore.NewQueryError(datastore.ProviderKubernetes, "failed to list CRs", err). + WithMetadata("status", string(status)) + } + + allEvents := crListToHealthEvents(list) + statusStr := string(status) + + var matched []datastore.HealthEventWithStatus + + for _, ev := range allEvents { + if (ev.HealthEventStatus.NodeQuarantined != nil && string(*ev.HealthEventStatus.NodeQuarantined) == statusStr) || + string(ev.HealthEventStatus.UserPodsEvictionStatus.Status) == statusStr { + matched = append(matched, ev) + } + } + + slog.Debug("FindHealthEventsByStatus completed", + "status", status, "totalCRs", len(allEvents), "matched", len(matched)) + + return matched, nil +} + +// FindHealthEventsByQuery converts the QueryBuilder to a filter map and delegates to FindHealthEventsByFilter. +func (k *KubernetesHealthEventStore) FindHealthEventsByQuery( + ctx context.Context, builder datastore.QueryBuilder, +) ([]datastore.HealthEventWithStatus, error) { + filter := builder.ToMongo() + slog.Debug("FindHealthEventsByQuery called", "filter", filter) + + return k.FindHealthEventsByFilter(ctx, filter) +} + +// UpdateHealthEventsByQuery lists matching CRs via the query builder and applies +// the update builder's fields to each CR's status. +func (k *KubernetesHealthEventStore) UpdateHealthEventsByQuery( + ctx context.Context, queryBuilder datastore.QueryBuilder, updateBuilder datastore.UpdateBuilder, +) error { + slog.Debug("UpdateHealthEventsByQuery called") + + events, err := k.FindHealthEventsByQuery(ctx, queryBuilder) + if err != nil { + return fmt.Errorf("failed to find CRs for update: %w", err) + } + + updateFields := updateBuilder.ToMongo() + setFields, _ := updateFields["$set"].(map[string]interface{}) + + if setFields == nil { + return fmt.Errorf("update builder produced no $set fields") + } + + slog.Debug("UpdateHealthEventsByQuery applying updates", + "matchedEvents", len(events), "setFields", setFields) + + updatedCount := 0 + + for _, ev := range events { + if ev.RawEvent == nil { + continue + } + + crName, _ := ev.RawEvent["crName"].(string) + if crName == "" { + slog.Warn("Skipping event with no crName in RawEvent") + continue + } + + cr := &model.HealthEventResourceCRD{} + key := crclient.ObjectKey{Name: crName, Namespace: k.namespace} + + if err := k.client.Get(ctx, key, cr); err != nil { + return fmt.Errorf("failed to get CR %s: %w", crName, err) + } + + applySetFieldsToStatus(cr, setFields) + + if err := k.client.Status().Update(ctx, cr); err != nil { + return fmt.Errorf("failed to update CR %s: %w", crName, err) + } + + updatedCount++ + } + + slog.Debug("UpdateHealthEventsByQuery completed", "updatedCount", updatedCount) + + return nil +} + +// UpdateNodeQuarantineStatus updates the nodeQuarantined field on a CR's status subresource. +func (k *KubernetesHealthEventStore) UpdateNodeQuarantineStatus( + ctx context.Context, eventID string, status datastore.Status, +) error { + slog.Debug("UpdateNodeQuarantineStatus called", "eventID", eventID, "status", status) + + cr := &model.HealthEventResourceCRD{} + key := crclient.ObjectKey{Name: eventID, Namespace: k.namespace} + + if err := k.client.Get(ctx, key, cr); err != nil { + return fmt.Errorf("failed to get HealthEventResource %s: %w", eventID, err) + } + + if cr.Status == nil { + cr.Status = &protos.HealthEventStatus{} + } + + cr.Status.NodeQuarantined = string(status) + + if status == datastore.Quarantined || status == datastore.AlreadyQuarantined { + cr.Status.QuarantineFinishTimestamp = timestamppb.Now() + slog.Debug("Set quarantine finish timestamp", "eventID", eventID) + } + + if err := k.client.Status().Update(ctx, cr); err != nil { + return fmt.Errorf("failed to update quarantine status for %s: %w", eventID, err) + } + + slog.Info("Updated node quarantine status", "eventID", eventID, "status", status) + + return nil +} + +// UpdatePodEvictionStatus updates the userPodsEvictionStatus on a CR's status. +func (k *KubernetesHealthEventStore) UpdatePodEvictionStatus( + ctx context.Context, eventID string, status datastore.OperationStatus, +) error { + slog.Debug("UpdatePodEvictionStatus called", + "eventID", eventID, "evictionStatus", status.Status, "message", status.Message) + + cr := &model.HealthEventResourceCRD{} + key := crclient.ObjectKey{Name: eventID, Namespace: k.namespace} + + if err := k.client.Get(ctx, key, cr); err != nil { + return fmt.Errorf("failed to get HealthEventResource %s: %w", eventID, err) + } + + if cr.Status == nil { + cr.Status = &protos.HealthEventStatus{} + } + + cr.Status.UserPodsEvictionStatus = &protos.OperationStatus{ + Status: string(status.Status), + Message: status.Message, + } + + if status.Status == datastore.StatusSucceeded || status.Status == datastore.StatusFailed { + cr.Status.DrainFinishTimestamp = timestamppb.Now() + slog.Debug("Set drain finish timestamp", "eventID", eventID) + } + + if err := k.client.Status().Update(ctx, cr); err != nil { + return fmt.Errorf("failed to update pod eviction status for %s: %w", eventID, err) + } + + slog.Info("Updated pod eviction status", "eventID", eventID, "status", status.Status) + + return nil +} + +// UpdateRemediationStatus updates the faultRemediated field on a CR's status. +func (k *KubernetesHealthEventStore) UpdateRemediationStatus( + ctx context.Context, eventID string, status interface{}, +) error { + slog.Debug("UpdateRemediationStatus called", "eventID", eventID, "statusType", fmt.Sprintf("%T", status)) + + var faultRemediated bool + + switch v := status.(type) { + case bool: + faultRemediated = v + case *bool: + if v == nil { + slog.Debug("UpdateRemediationStatus skipped: nil bool pointer", "eventID", eventID) + return nil + } + + faultRemediated = *v + default: + return fmt.Errorf("invalid remediation status type: %T", status) + } + + cr := &model.HealthEventResourceCRD{} + key := crclient.ObjectKey{Name: eventID, Namespace: k.namespace} + + if err := k.client.Get(ctx, key, cr); err != nil { + return fmt.Errorf("failed to get HealthEventResource %s: %w", eventID, err) + } + + if cr.Status == nil { + cr.Status = &protos.HealthEventStatus{} + } + + cr.Status.FaultRemediated = wrapperspb.Bool(faultRemediated) + cr.Status.LastRemediationTimestamp = timestamppb.Now() + + if err := k.client.Status().Update(ctx, cr); err != nil { + return fmt.Errorf("failed to update remediation status for %s: %w", eventID, err) + } + + slog.Info("Updated remediation status", "eventID", eventID, "faultRemediated", faultRemediated) + + return nil +} + +// CheckIfNodeAlreadyDrained checks if any CR for the given node has eviction status Succeeded. +func (k *KubernetesHealthEventStore) CheckIfNodeAlreadyDrained( + ctx context.Context, nodeName string, +) (bool, error) { + slog.Debug("CheckIfNodeAlreadyDrained called", "node", nodeName) + + list := &model.HealthEventResourceCRDList{} + if err := k.client.List(ctx, list, + crclient.InNamespace(k.namespace), + crclient.MatchingLabels{labelNodeName: nodeName}, + ); err != nil { + return false, datastore.NewQueryError(datastore.ProviderKubernetes, "failed to list CRs for drain check", err). + WithMetadata("nodeName", nodeName) + } + + for i := range list.Items { + if list.Items[i].Status != nil && + list.Items[i].Status.UserPodsEvictionStatus != nil && + list.Items[i].Status.UserPodsEvictionStatus.Status == string(datastore.StatusSucceeded) { + slog.Debug("Node already drained", "node", nodeName, "crName", list.Items[i].Name) + return true, nil + } + } + + slog.Debug("Node not yet drained", "node", nodeName, "crsChecked", len(list.Items)) + + return false, nil +} + +// FindLatestEventForNode returns the most recently created CR for a given node. +func (k *KubernetesHealthEventStore) FindLatestEventForNode( + ctx context.Context, nodeName string, +) (*datastore.HealthEventWithStatus, error) { + slog.Debug("FindLatestEventForNode called", "node", nodeName) + + list := &model.HealthEventResourceCRDList{} + if err := k.client.List(ctx, list, + crclient.InNamespace(k.namespace), + crclient.MatchingLabels{labelNodeName: nodeName}, + ); err != nil { + return nil, datastore.NewQueryError(datastore.ProviderKubernetes, "failed to list CRs for latest event", err). + WithMetadata("nodeName", nodeName) + } + + if len(list.Items) == 0 { + slog.Debug("No events found for node", "node", nodeName) + return nil, nil + } + + sort.Slice(list.Items, func(i, j int) bool { + return list.Items[i].CreationTimestamp.Time.After(list.Items[j].CreationTimestamp.Time) + }) + + latest := crToHealthEvent(&list.Items[0]) + slog.Debug("FindLatestEventForNode completed", + "node", nodeName, "totalEvents", len(list.Items), "latestCR", list.Items[0].Name) + + return latest, nil +} + +// --- helpers --- + +// datastoreStatusToProto converts datastore.HealthEventStatus to the protobuf status +// used by the CRD. +func datastoreStatusToProto(status datastore.HealthEventStatus) *protos.HealthEventStatus { + ps := &protos.HealthEventStatus{ + QuarantineFinishTimestamp: status.QuarantineFinishTimestamp, + DrainFinishTimestamp: status.DrainFinishTimestamp, + LastRemediationTimestamp: status.LastRemediationTimestamp, + } + + if status.NodeQuarantined != nil { + ps.NodeQuarantined = string(*status.NodeQuarantined) + } + + ps.UserPodsEvictionStatus = &protos.OperationStatus{ + Status: string(status.UserPodsEvictionStatus.Status), + Message: status.UserPodsEvictionStatus.Message, + } + + if status.FaultRemediated != nil { + ps.FaultRemediated = wrapperspb.Bool(*status.FaultRemediated) + } + + slog.Debug("Converted datastore status to proto", + "nodeQuarantined", ps.NodeQuarantined, + "evictionStatus", ps.UserPodsEvictionStatus.GetStatus(), + "faultRemediated", ps.FaultRemediated.GetValue()) + + return ps +} + +// protoStatusToDatastore converts the protobuf CRD status to the datastore abstraction type. +func protoStatusToDatastore(ps *protos.HealthEventStatus) datastore.HealthEventStatus { + if ps == nil { + slog.Debug("Proto status is nil, returning empty datastore status") + return datastore.HealthEventStatus{} + } + + ds := datastore.HealthEventStatus{ + QuarantineFinishTimestamp: ps.QuarantineFinishTimestamp, + DrainFinishTimestamp: ps.DrainFinishTimestamp, + LastRemediationTimestamp: ps.LastRemediationTimestamp, + } + + if ps.NodeQuarantined != "" { + s := datastore.Status(ps.NodeQuarantined) + ds.NodeQuarantined = &s + } + + if ps.UserPodsEvictionStatus != nil { + ds.UserPodsEvictionStatus = datastore.OperationStatus{ + Status: datastore.Status(ps.UserPodsEvictionStatus.Status), + Message: ps.UserPodsEvictionStatus.Message, + } + } + + if ps.FaultRemediated != nil { + b := ps.FaultRemediated.GetValue() + ds.FaultRemediated = &b + } + + slog.Debug("Converted proto status to datastore", + "nodeQuarantined", ps.NodeQuarantined, + "evictionStatus", ps.UserPodsEvictionStatus.GetStatus(), + "faultRemediated", ps.FaultRemediated.GetValue()) + + return ds +} + +// crListToHealthEvents converts a typed CRD list to datastore.HealthEventWithStatus slice. +func crListToHealthEvents(list *model.HealthEventResourceCRDList) []datastore.HealthEventWithStatus { + events := make([]datastore.HealthEventWithStatus, 0, len(list.Items)) + + for i := range list.Items { + ev := crToHealthEvent(&list.Items[i]) + events = append(events, *ev) + } + + slog.Debug("Converted CR list to health events", "count", len(events)) + + return events +} + +// crToHealthEvent converts a single typed CR to a datastore.HealthEventWithStatus. +func crToHealthEvent(cr *model.HealthEventResourceCRD) *datastore.HealthEventWithStatus { + createdAt := cr.CreationTimestamp.Time + if ann := cr.GetAnnotations(); ann != nil { + if ts, ok := ann[originalHealthEventTimestamp]; ok { + if t, err := parseTime(ts); err == nil { + createdAt = t + } + } + } + + nodeName := "" + if cr.Spec != nil { + nodeName = cr.Spec.NodeName + } + + slog.Debug("Converted CR to health event", "crName", cr.Name, "node", nodeName, "createdAt", createdAt) + + return &datastore.HealthEventWithStatus{ + CreatedAt: createdAt, + HealthEvent: cr.Spec, + HealthEventStatus: protoStatusToDatastore(cr.Status), + RawEvent: datastore.Event{"crName": cr.Name}, + } +} + +// buildMatchingLabels builds a label map from recognized filter keys. +func buildMatchingLabels(filter map[string]interface{}) map[string]string { + labels := map[string]string{} + + if v, ok := filter["healthevent.nodename"]; ok { + labels[labelNodeName] = fmt.Sprintf("%v", v) + } + + if v, ok := filter["healthevent.checkname"]; ok { + labels[labelCheckName] = fmt.Sprintf("%v", v) + } + + if v, ok := filter["healthevent.agent"]; ok { + labels[labelAgent] = fmt.Sprintf("%v", v) + } + + return labels +} + +// filterInMemory applies non-label filter keys by checking the deserialized events. +func filterInMemory(events []datastore.HealthEventWithStatus, filter map[string]interface{}) []datastore.HealthEventWithStatus { + knownLabelKeys := map[string]bool{ + "healthevent.nodename": true, + "healthevent.checkname": true, + "healthevent.agent": true, + } + + hasInMemoryFilters := false + + for key := range filter { + if !knownLabelKeys[key] { + hasInMemoryFilters = true + break + } + } + + if !hasInMemoryFilters { + return events + } + + slog.Debug("Applying in-memory filters", "inputCount", len(events), "filter", filter) + + var result []datastore.HealthEventWithStatus + + for _, ev := range events { + if matchesFilter(ev, filter) { + result = append(result, ev) + } + } + + slog.Debug("In-memory filtering completed", "inputCount", len(events), "outputCount", len(result)) + + return result +} + +// matchesFilter checks whether a HealthEventWithStatus matches all non-label filter criteria. +func matchesFilter(ev datastore.HealthEventWithStatus, filter map[string]interface{}) bool { + for key, expected := range filter { + switch key { + case "healtheventstatus.nodequarantined": + if ev.HealthEventStatus.NodeQuarantined == nil || string(*ev.HealthEventStatus.NodeQuarantined) != fmt.Sprintf("%v", expected) { + return false + } + case "healtheventstatus.userpodsevictionstatus.status": + if string(ev.HealthEventStatus.UserPodsEvictionStatus.Status) != fmt.Sprintf("%v", expected) { + return false + } + case "healtheventstatus.faultremediated": + if ev.HealthEventStatus.FaultRemediated == nil { + return false + } + + if fmt.Sprintf("%v", *ev.HealthEventStatus.FaultRemediated) != fmt.Sprintf("%v", expected) { + return false + } + } + } + + return true +} + +// applySetFieldsToStatus applies a map of dot-path $set fields to the CR's proto status. +func applySetFieldsToStatus(cr *model.HealthEventResourceCRD, setFields map[string]interface{}) { + if cr.Status == nil { + cr.Status = &protos.HealthEventStatus{} + } + + slog.Debug("Applying set fields to CR status", "crName", cr.Name, "fields", setFields) + + for dotPath, value := range setFields { + key := dotPath + if len(key) > len("healtheventstatus.") && key[:len("healtheventstatus.")] == "healtheventstatus." { + key = key[len("healtheventstatus."):] + } + + switch key { + case "nodequarantined": + cr.Status.NodeQuarantined = fmt.Sprintf("%v", value) + case "faultremediated": + if b, ok := value.(bool); ok { + cr.Status.FaultRemediated = wrapperspb.Bool(b) + } + case "userpodsevictionstatus.status": + if cr.Status.UserPodsEvictionStatus == nil { + cr.Status.UserPodsEvictionStatus = &protos.OperationStatus{} + } + + cr.Status.UserPodsEvictionStatus.Status = fmt.Sprintf("%v", value) + case "userpodsevictionstatus.message": + if cr.Status.UserPodsEvictionStatus == nil { + cr.Status.UserPodsEvictionStatus = &protos.OperationStatus{} + } + + cr.Status.UserPodsEvictionStatus.Message = fmt.Sprintf("%v", value) + default: + slog.Warn("Unknown set field key, skipping", "key", key, "originalPath", dotPath) + } + } +} + +var _ datastore.HealthEventStore = (*KubernetesHealthEventStore)(nil) diff --git a/store-client/pkg/datastore/providers/kubernetes/register.go b/store-client/pkg/datastore/providers/kubernetes/register.go new file mode 100644 index 000000000..1c61211c0 --- /dev/null +++ b/store-client/pkg/datastore/providers/kubernetes/register.go @@ -0,0 +1,34 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "log/slog" + + "github.com/nvidia/nvsentinel/store-client/pkg/datastore" +) + +// init automatically registers the Kubernetes provider with the global registry +func init() { + slog.Info("Registering Kubernetes datastore provider") + datastore.RegisterProvider(datastore.ProviderKubernetes, NewKubernetesDataStore) +} + +// NewKubernetesDataStore creates a new Kubernetes datastore instance from configuration +func NewKubernetesDataStore(ctx context.Context, config datastore.DataStoreConfig) (datastore.DataStore, error) { + // Create the Kubernetes store that implements the DataStore interface + return NewKubernetesStore(ctx, config) +} diff --git a/store-client/pkg/datastore/providers/kubernetes/utils.go b/store-client/pkg/datastore/providers/kubernetes/utils.go new file mode 100644 index 000000000..09e6167ac --- /dev/null +++ b/store-client/pkg/datastore/providers/kubernetes/utils.go @@ -0,0 +1,26 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import "time" + +func parseTime(s string) (time.Time, error) { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + return time.Parse(time.RFC3339Nano, s) + } + + return t, nil +} diff --git a/store-client/pkg/datastore/providers/kubernetes/watcher.go b/store-client/pkg/datastore/providers/kubernetes/watcher.go new file mode 100644 index 000000000..df087f822 --- /dev/null +++ b/store-client/pkg/datastore/providers/kubernetes/watcher.go @@ -0,0 +1,280 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/nvidia/nvsentinel/data-models/pkg/model" + "github.com/nvidia/nvsentinel/store-client/pkg/client" + "github.com/nvidia/nvsentinel/store-client/pkg/datastore" + + "k8s.io/apimachinery/pkg/watch" + crclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +// KubernetesChangeStreamWatcher implements both datastore.ChangeStreamWatcher and +// provides Unwrap() to return a client.ChangeStreamWatcher. This dual-interface +// support is required because fault-quarantine, node-drainer, and fault-remediation +// unwrap the abstract watcher to access the legacy client.ChangeStreamWatcher.Events() channel. +// +// Under the hood, this uses a controller-runtime WithWatch client to watch typed +// HealthEventResourceCRD objects. +type KubernetesChangeStreamWatcher struct { + k8sClient crclient.WithWatch + namespace string + clientName string + + eventChan chan datastore.EventWithToken + oldEventChan chan client.Event + stopCh chan struct{} +} + +// NewKubernetesChangeStreamWatcher creates a new Kubernetes change stream watcher. +func NewKubernetesChangeStreamWatcher( + k8sClient crclient.WithWatch, namespace string, clientName string, +) *KubernetesChangeStreamWatcher { + slog.Info("Creating Kubernetes change stream watcher", "clientName", clientName, "namespace", namespace) + + return &KubernetesChangeStreamWatcher{ + k8sClient: k8sClient, + namespace: namespace, + clientName: clientName, + eventChan: make(chan datastore.EventWithToken), + oldEventChan: make(chan client.Event), + stopCh: make(chan struct{}), + } +} + +// Events returns the abstract event channel (datastore.ChangeStreamWatcher interface). +func (w *KubernetesChangeStreamWatcher) Events() <-chan datastore.EventWithToken { + return w.eventChan +} + +// Start begins watching HealthEventResource CRs via the controller-runtime typed client. +// It translates K8s watch events into the same format that MongoDB change streams +// produce, so downstream consumers (fault-quarantine EventWatcher) work unchanged. +func (w *KubernetesChangeStreamWatcher) Start(ctx context.Context) { + slog.Info("Starting Kubernetes change stream watcher", "clientName", w.clientName, "namespace", w.namespace) + + go func() { + defer close(w.oldEventChan) + defer close(w.eventChan) + + const ( + initialBackoff = 1 * time.Second + maxBackoff = 30 * time.Second + ) + backoff := initialBackoff + + for { + select { + case <-ctx.Done(): + slog.Info("Kubernetes watcher context cancelled", "clientName", w.clientName) + return + case <-w.stopCh: + slog.Info("Kubernetes watcher stop signal received", "clientName", w.clientName) + return + default: + } + + slog.Debug("Establishing Kubernetes watch", "clientName", w.clientName, "namespace", w.namespace) + + list := &model.HealthEventResourceCRDList{} + watcher, err := w.k8sClient.Watch(ctx, list, crclient.InNamespace(w.namespace)) + if err != nil { + slog.Error("Failed to start Kubernetes watch, will retry", "error", err, "clientName", w.clientName, "retryIn", backoff) + select { + case <-time.After(backoff): + case <-ctx.Done(): + return + case <-w.stopCh: + return + } + backoff = min(backoff*2, maxBackoff) + continue + } + + backoff = initialBackoff + slog.Info("Kubernetes watch established", "clientName", w.clientName) + w.processWatchEvents(ctx, watcher) + watcher.Stop() + slog.Info("Kubernetes watch ended, will re-establish", "clientName", w.clientName) + } + }() +} + +func (w *KubernetesChangeStreamWatcher) processWatchEvents( + ctx context.Context, watcher watch.Interface, +) { + for { + select { + case <-ctx.Done(): + return + case <-w.stopCh: + return + case watchEvent, ok := <-watcher.ResultChan(): + if !ok { + slog.Warn("Kubernetes watch channel closed, will re-establish", "clientName", w.clientName) + return + } + + if watchEvent.Type == watch.Error { + slog.Error("Kubernetes watch error event", "object", watchEvent.Object) + return + } + + if watchEvent.Type != watch.Added && watchEvent.Type != watch.Modified { + slog.Debug("Ignoring watch event type", "type", watchEvent.Type, "clientName", w.clientName) + continue + } + + cr, ok := watchEvent.Object.(*model.HealthEventResourceCRD) + if !ok { + slog.Error("Watch event object is not HealthEventResourceCRD", "type", fmt.Sprintf("%T", watchEvent.Object)) + continue + } + + nodeName := "" + if cr.Spec != nil { + nodeName = cr.Spec.NodeName + } + + slog.Info("Received watch event", + "type", watchEvent.Type, "crName", cr.Name, + "node", nodeName, "resourceVersion", cr.ResourceVersion, + "clientName", w.clientName) + + k8sEvent := &kubernetesEvent{cr: cr} + + select { + case w.oldEventChan <- k8sEvent: + slog.Debug("Watch event dispatched to legacy channel", "crName", cr.Name) + case <-ctx.Done(): + return + } + } + } +} + +// MarkProcessed is a no-op for Kubernetes. K8s watches are position-based via +// resourceVersion, which is tracked automatically. +func (w *KubernetesChangeStreamWatcher) MarkProcessed(_ context.Context, _ []byte) error { + return nil +} + +// Close stops the watcher and closes channels. +func (w *KubernetesChangeStreamWatcher) Close(_ context.Context) error { + slog.Info("Closing Kubernetes change stream watcher", "clientName", w.clientName) + + select { + case <-w.stopCh: + default: + close(w.stopCh) + } + + return nil +} + +// Unwrap returns a client.ChangeStreamWatcher for backward compatibility. +// Fault-quarantine, node-drainer, and fault-remediation type-assert for this method. +func (w *KubernetesChangeStreamWatcher) Unwrap() client.ChangeStreamWatcher { + slog.Debug("Unwrapping Kubernetes watcher to legacy interface", "clientName", w.clientName) + return &kubernetesLegacyWatcher{parent: w} +} + +// kubernetesLegacyWatcher adapts the Kubernetes watcher to the legacy client.ChangeStreamWatcher interface. +type kubernetesLegacyWatcher struct { + parent *KubernetesChangeStreamWatcher +} + +func (lw *kubernetesLegacyWatcher) Start(ctx context.Context) { + lw.parent.Start(ctx) +} + +func (lw *kubernetesLegacyWatcher) Events() <-chan client.Event { + return lw.parent.oldEventChan +} + +func (lw *kubernetesLegacyWatcher) MarkProcessed(ctx context.Context, token []byte) error { + return lw.parent.MarkProcessed(ctx, token) +} + +func (lw *kubernetesLegacyWatcher) Close(ctx context.Context) error { + return lw.parent.Close(ctx) +} + +// kubernetesEvent implements client.Event by wrapping a typed HealthEventResourceCRD. +type kubernetesEvent struct { + cr *model.HealthEventResourceCRD +} + +func (e *kubernetesEvent) GetDocumentID() (string, error) { + return e.cr.Name, nil +} + +func (e *kubernetesEvent) GetRecordUUID() (string, error) { + return e.cr.Name, nil +} + +func (e *kubernetesEvent) GetNodeName() (string, error) { + if label, ok := e.cr.Labels[labelNodeName]; ok && label != "" { + return label, nil + } + + if e.cr.Spec != nil && e.cr.Spec.NodeName != "" { + return e.cr.Spec.NodeName, nil + } + + slog.Warn("nodeName not found in HealthEventResource", "crName", e.cr.Name) + + return "", fmt.Errorf("nodeName not found in HealthEventResource") +} + +func (e *kubernetesEvent) GetResumeToken() []byte { + return []byte(e.cr.ResourceVersion) +} + +// UnmarshalDocument populates a model.HealthEventWithStatus directly from the typed CR. +func (e *kubernetesEvent) UnmarshalDocument(v interface{}) error { + target, ok := v.(*model.HealthEventWithStatus) + if !ok { + return fmt.Errorf("unsupported target type for UnmarshalDocument: %T", v) + } + + createdAt := e.cr.CreationTimestamp.Time + if ann := e.cr.GetAnnotations(); ann != nil { + if ts, ok := ann[originalHealthEventTimestamp]; ok { + if t, err := parseTime(ts); err == nil { + createdAt = t + } + } + } + + target.HealthEvent = e.cr.Spec + target.HealthEventStatus = e.cr.Status + target.CreatedAt = createdAt + + slog.Debug("UnmarshalDocument completed", "crName", e.cr.Name, "createdAt", createdAt) + + return nil +} + +var _ datastore.ChangeStreamWatcher = (*KubernetesChangeStreamWatcher)(nil) +var _ client.ChangeStreamWatcher = (*kubernetesLegacyWatcher)(nil) +var _ client.Event = (*kubernetesEvent)(nil) diff --git a/store-client/pkg/datastore/providers/kubernetes/watcher_factory.go b/store-client/pkg/datastore/providers/kubernetes/watcher_factory.go new file mode 100644 index 000000000..17d15e5de --- /dev/null +++ b/store-client/pkg/datastore/providers/kubernetes/watcher_factory.go @@ -0,0 +1,69 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "fmt" + "log/slog" + + "github.com/nvidia/nvsentinel/store-client/pkg/datastore" + "github.com/nvidia/nvsentinel/store-client/pkg/watcher" +) + +// KubernetesWatcherFactory implements WatcherFactory for the Kubernetes CRD provider. +type KubernetesWatcherFactory struct{} + +// NewKubernetesWatcherFactory creates a new Kubernetes watcher factory. +func NewKubernetesWatcherFactory() watcher.WatcherFactory { + return &KubernetesWatcherFactory{} +} + +// CreateChangeStreamWatcher creates a Kubernetes change stream watcher backed by the +// Kubernetes Watch API on HealthEventResource CRDs. +func (f *KubernetesWatcherFactory) CreateChangeStreamWatcher( + ctx context.Context, + ds datastore.DataStore, + config watcher.WatcherConfig, +) (datastore.ChangeStreamWatcher, error) { + k8sStore, ok := ds.(*KubernetesDataStore) + if !ok { + return nil, fmt.Errorf("expected Kubernetes datastore, got %T", ds) + } + + clientName := "watcher-factory" + if config.Options != nil { + if name, ok := config.Options["ClientName"].(string); ok && name != "" { + clientName = name + } + } + + slog.Info("Creating Kubernetes change stream watcher", + "clientName", clientName, + "namespace", k8sStore.namespace) + + return NewKubernetesChangeStreamWatcher( + k8sStore.k8sClient, k8sStore.namespace, clientName, + ), nil +} + +// SupportedProvider returns the provider this factory supports. +func (f *KubernetesWatcherFactory) SupportedProvider() datastore.DataStoreProvider { + return datastore.ProviderKubernetes +} + +func init() { + watcher.RegisterWatcherFactory(datastore.ProviderKubernetes, NewKubernetesWatcherFactory()) +} diff --git a/store-client/pkg/datastore/providers/providers.go b/store-client/pkg/datastore/providers/providers.go index 657b04f4a..c5c34fdb5 100644 --- a/store-client/pkg/datastore/providers/providers.go +++ b/store-client/pkg/datastore/providers/providers.go @@ -16,6 +16,7 @@ package providers // Import all providers to ensure they are registered import ( + _ "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/kubernetes" _ "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/mongodb" _ "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/postgresql" ) diff --git a/store-client/pkg/datastore/types.go b/store-client/pkg/datastore/types.go index 3d4d1c6b6..50214588f 100644 --- a/store-client/pkg/datastore/types.go +++ b/store-client/pkg/datastore/types.go @@ -26,6 +26,7 @@ type DataStoreProvider string const ( ProviderMongoDB DataStoreProvider = "mongodb" ProviderPostgreSQL DataStoreProvider = "postgresql" + ProviderKubernetes DataStoreProvider = "kubernetes" ) // Event represents a database-agnostic document or event as a map. diff --git a/store-client/pkg/datastore/watcher/factory.go b/store-client/pkg/datastore/watcher/factory.go index 7f49ea11f..79eef84cd 100644 --- a/store-client/pkg/datastore/watcher/factory.go +++ b/store-client/pkg/datastore/watcher/factory.go @@ -20,6 +20,7 @@ import ( "log/slog" "github.com/nvidia/nvsentinel/store-client/pkg/datastore" + "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/kubernetes" "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/mongodb" "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/postgresql" ) @@ -75,6 +76,14 @@ func CreateChangeStreamWatcher( return datastore.NewChangeStreamWatcher(ctx, postgresConfig) + case *kubernetes.KubernetesDataStore: + k8sConfig := map[string]interface{}{ + "ClientName": config.ClientName, + "Pipeline": config.Pipeline, + } + + return datastore.NewChangeStreamWatcher(ctx, k8sConfig) + default: return nil, fmt.Errorf("change stream watching not supported for datastore type: %T", datastore) } diff --git a/store-client/pkg/factory/client_factory.go b/store-client/pkg/factory/client_factory.go index 1b6945df9..f9c9e9d18 100644 --- a/store-client/pkg/factory/client_factory.go +++ b/store-client/pkg/factory/client_factory.go @@ -25,6 +25,7 @@ import ( "github.com/nvidia/nvsentinel/store-client/pkg/client" "github.com/nvidia/nvsentinel/store-client/pkg/config" "github.com/nvidia/nvsentinel/store-client/pkg/datastore" + providers_kubernetes "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/kubernetes" providers_postgresql "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/postgresql" ) @@ -102,6 +103,24 @@ func (f *ClientFactory) CreateDatabaseClient(ctx context.Context) (client.Databa return providers_postgresql.NewPostgreSQLDatabaseClient(db, tableName), nil + case string(datastore.ProviderKubernetes): + dsConfig, err := datastore.LoadDatastoreConfig() + if err != nil { + return nil, fmt.Errorf("failed to load Kubernetes datastore config: %w", err) + } + + ds, err := datastore.NewDataStore(ctx, *dsConfig) + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes datastore: %w", err) + } + + k8sDS, ok := ds.(*providers_kubernetes.KubernetesDataStore) + if !ok { + return nil, fmt.Errorf("unexpected datastore type for kubernetes provider: %T", ds) + } + + return k8sDS.GetDatabaseClient(), nil + case string(datastore.ProviderMongoDB), "": // Default to MongoDB for backward compatibility return client.NewMongoDBClient(ctx, f.dbConfig) @@ -111,7 +130,7 @@ func (f *ClientFactory) CreateDatabaseClient(ctx context.Context) (client.Databa datastore.DataStoreProvider(provider), "unsupported datastore provider", fmt.Errorf("provider '%s' is not supported", provider), - ).WithMetadata("supportedProviders", []string{"mongodb", "postgresql"}) + ).WithMetadata("supportedProviders", []string{"mongodb", "postgresql", "kubernetes"}) } } diff --git a/tilt/Tiltfile b/tilt/Tiltfile index a317ac064..5e4d7cc21 100755 --- a/tilt/Tiltfile +++ b/tilt/Tiltfile @@ -26,13 +26,16 @@ update_settings( num_gpu_nodes = int(os.getenv('NUM_GPU_NODES', '50')) num_kata_test_nodes = int(os.getenv('NUM_KATA_TEST_NODES', '5')) use_postgresql = os.getenv('USE_POSTGRESQL', '0') == '1' +use_kubernetes = os.getenv('USE_KUBERNETES', '0') == '1' user_values_file = os.getenv('NVSENTINEL_VALUES_FILE', '') print_values = os.getenv('PRINT_HELM_VALUES', '0') == '1' -if use_postgresql: +if use_kubernetes: + print("Using Kubernetes CRDs as datastore (USE_KUBERNETES=1)") +elif use_postgresql: print("Using PostgreSQL as datastore (USE_POSTGRESQL=1)") else: - print("Using MongoDB as datastore (default). Set USE_POSTGRESQL=1 to use PostgreSQL.") + print("Using MongoDB as datastore (default). Set USE_POSTGRESQL=1 or USE_KUBERNETES=1 to switch.") helm_repo('jetstack', 'https://charts.jetstack.io') helm_resource( @@ -137,7 +140,9 @@ values_files = [ ] # Add datastore-specific values -if use_postgresql: +if use_kubernetes: + values_files.append('../distros/kubernetes/nvsentinel/values-tilt-kubernetes.yaml') +elif use_postgresql: values_files.append('../distros/kubernetes/nvsentinel/values-tilt-postgresql.yaml') else: values_files.append('../distros/kubernetes/nvsentinel/values-tilt-mongodb.yaml') @@ -189,14 +194,18 @@ local_resource( k8s_yaml(yaml) # Check if using Percona operator (only relevant when using MongoDB) -use_percona = effective_values.get('mongodb-store', {}).get('usePerconaOperator', False) if not use_postgresql else False -if use_percona: +use_percona = effective_values.get('mongodb-store', {}).get('usePerconaOperator', False) if not (use_postgresql or use_kubernetes) else False +if use_kubernetes: + print("Using Kubernetes CRDs for datastore (no external database needed).") +elif use_percona: print("Using Percona MongoDB Operator for MongoDB installation.") else: print("Using bitnami MongoDB installation.") # Determine datastore resource name -if use_postgresql: +if use_kubernetes: + datastore_resource = 'cert-manager-resources' +elif use_postgresql: datastore_resource = 'nvsentinel-postgresql' else: datastore_resource = 'mongodb' if not use_percona else 'create-mongodb-database' @@ -214,7 +223,7 @@ if use_postgresql: 'postgresql-server-cert:certificate', 'postgresql-client-cert:certificate' ]) -else: +elif not use_kubernetes: if use_percona: cert_manager_objects.append('mongo-app-client-cert:certificate') else: @@ -254,8 +263,8 @@ if use_percona: resource_deps=['wait-for-cert-manager-crds'], ) -# MongoDB database creation (only for MongoDB, not PostgreSQL) -if not use_postgresql: +# MongoDB database creation (only for MongoDB, not PostgreSQL or Kubernetes) +if not use_postgresql and not use_kubernetes: k8s_resource( 'create-mongodb-database', resource_deps=['wait-for-cert-manager-crds', 'nvsentinel-psmdb-operator'] if use_percona else ['wait-for-cert-manager-crds'], @@ -346,10 +355,11 @@ k8s_resource( resource_deps=[datastore_resource] ) -k8s_resource( - 'csp-health-monitor', - resource_deps=[datastore_resource, 'csp-api-mock'] -) +if not use_kubernetes: + k8s_resource( + 'csp-health-monitor', + resource_deps=[datastore_resource, 'csp-api-mock'] + ) k8s_resource( 'kwok-stage-fast',