From c41e5cd70a3e0cf083b0b166ad5d147c6ccad46f Mon Sep 17 00:00:00 2001 From: xliuqq Date: Mon, 27 Apr 2026 09:01:39 +0800 Subject: [PATCH 1/4] remove schema for component and add test for component Signed-off-by: xliuqq --- pkg/ddc/cache/component/component_manager.go | 10 +- .../cache/component/component_suite_test.go | 29 ++ pkg/ddc/cache/component/component_test.go | 437 ++++++++++++++++++ pkg/ddc/cache/component/daemonset_manager.go | 4 +- .../cache/component/statefulset_manager.go | 14 +- pkg/ddc/cache/engine/client.go | 5 +- pkg/ddc/cache/engine/engine.go | 18 +- pkg/ddc/cache/engine/master.go | 5 +- pkg/ddc/cache/engine/status.go | 11 +- pkg/ddc/cache/engine/worker.go | 5 +- 10 files changed, 508 insertions(+), 30 deletions(-) create mode 100644 pkg/ddc/cache/component/component_suite_test.go create mode 100644 pkg/ddc/cache/component/component_test.go diff --git a/pkg/ddc/cache/component/component_manager.go b/pkg/ddc/cache/component/component_manager.go index 77282ce730d..958b2fb651c 100644 --- a/pkg/ddc/cache/component/component_manager.go +++ b/pkg/ddc/cache/component/component_manager.go @@ -18,10 +18,10 @@ package component import ( "context" + "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -30,16 +30,16 @@ type ComponentManager interface { ConstructComponentStatus(todo context.Context, value *common.CacheRuntimeComponentValue) (v1alpha1.RuntimeComponentStatus, error) } -func NewComponentHelper(workloadType metav1.TypeMeta, scheme *runtime.Scheme, client client.Client) ComponentManager { +func NewComponentHelper(workloadType metav1.TypeMeta, client client.Client) ComponentManager { if workloadType.APIVersion == "apps/v1" { if workloadType.Kind == "StatefulSet" { - return newStatefulSetManager(scheme, client) + return newStatefulSetManager(client) } else if workloadType.Kind == "DaemonSet" { - return newDaemonSetManager(scheme, client) + return newDaemonSetManager(client) } } - return newStatefulSetManager(scheme, client) + return newStatefulSetManager(client) } // getCommonLabelsFromComponent returns the common labels for component used for stateful diff --git a/pkg/ddc/cache/component/component_suite_test.go b/pkg/ddc/cache/component/component_suite_test.go new file mode 100644 index 00000000000..f32d30c6dd4 --- /dev/null +++ b/pkg/ddc/cache/component/component_suite_test.go @@ -0,0 +1,29 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCacheComponent(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Cache Component Suite", Label("cache-component")) +} diff --git a/pkg/ddc/cache/component/component_test.go b/pkg/ddc/cache/component/component_test.go new file mode 100644 index 00000000000..b3fa237276a --- /dev/null +++ b/pkg/ddc/cache/component/component_test.go @@ -0,0 +1,437 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "context" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("ComponentManager", func() { + Describe("NewComponentHelper", func() { + It("should return StatefulSetManager for StatefulSet workload", func() { + workloadType := metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + } + scheme := runtime.NewScheme() + client := fake.NewFakeClientWithScheme(scheme) + + manager := NewComponentHelper(workloadType, client) + Expect(manager).NotTo(BeNil()) + _, ok := manager.(*StatefulSetManager) + Expect(ok).To(BeTrue()) + }) + + It("should return DaemonSetManager for DaemonSet workload", func() { + workloadType := metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + } + scheme := runtime.NewScheme() + client := fake.NewFakeClientWithScheme(scheme) + + manager := NewComponentHelper(workloadType, client) + Expect(manager).NotTo(BeNil()) + _, ok := manager.(*DaemonSetManager) + Expect(ok).To(BeTrue()) + }) + + It("should return StatefulSetManager as default for unknown kind", func() { + workloadType := metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Unknown", + } + scheme := runtime.NewScheme() + client := fake.NewFakeClientWithScheme(scheme) + + manager := NewComponentHelper(workloadType, client) + Expect(manager).NotTo(BeNil()) + _, ok := manager.(*StatefulSetManager) + Expect(ok).To(BeTrue()) + }) + + It("should return StatefulSetManager as default for wrong APIVersion", func() { + workloadType := metav1.TypeMeta{ + APIVersion: "v1", + Kind: "StatefulSet", + } + scheme := runtime.NewScheme() + client := fake.NewFakeClientWithScheme(scheme) + + manager := NewComponentHelper(workloadType, client) + Expect(manager).NotTo(BeNil()) + _, ok := manager.(*StatefulSetManager) + Expect(ok).To(BeTrue()) + }) + }) + + Describe("getCommonLabelsFromComponent", func() { + It("should generate correct labels with runtime name and component name", func() { + component := &common.CacheRuntimeComponentValue{ + Name: "test-runtime-master", + Owner: &common.OwnerReference{ + Name: "test-runtime", + }, + } + + labels := getCommonLabelsFromComponent(component) + Expect(labels).To(HaveKey(common.LabelCacheRuntimeName)) + Expect(labels[common.LabelCacheRuntimeName]).To(Equal("test-runtime")) + Expect(labels).To(HaveKey(common.LabelCacheRuntimeComponentName)) + Expect(labels[common.LabelCacheRuntimeComponentName]).To(Equal("test-runtime-master")) + Expect(len(labels)).To(Equal(2)) + }) + }) +}) + +var _ = Describe("StatefulSetManager", func() { + var ( + manager *StatefulSetManager + ctx context.Context + component *common.CacheRuntimeComponentValue + ) + + BeforeEach(func() { + scheme := runtime.NewScheme() + _ = appsv1.AddToScheme(scheme) + _ = corev1.AddToScheme(scheme) + client := fake.NewFakeClientWithScheme(scheme) + manager = newStatefulSetManager(client) + ctx = context.Background() + + replicas := int32(3) + component = &common.CacheRuntimeComponentValue{ + Name: "test-runtime-master", + Namespace: "fluid", + Replicas: replicas, + Owner: &common.OwnerReference{ + APIVersion: "data.fluid.io/v1alpha1", + Kind: "CacheRuntime", + Name: "test-runtime", + UID: "test-uid", + }, + PodTemplateSpec: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "master", + Image: "test-image:latest", + }, + }, + }, + }, + Service: &common.CacheRuntimeComponentServiceConfig{ + Name: "test-runtime-master-svc", + }, + WorkloadType: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + }, + } + }) + + Describe("Reconciler", func() { + It("should create StatefulSet and Service successfully", func() { + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + sts := &appsv1.StatefulSet{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-master", + Namespace: "fluid", + }, sts) + Expect(err).NotTo(HaveOccurred()) + Expect(sts.Name).To(Equal("test-runtime-master")) + Expect(*sts.Spec.Replicas).To(Equal(int32(3))) + Expect(sts.Spec.PodManagementPolicy).To(Equal(appsv1.ParallelPodManagement)) + Expect(sts.Spec.ServiceName).To(Equal("test-runtime-master-svc")) + + svc := &corev1.Service{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-master-svc", + Namespace: "fluid", + }, svc) + Expect(err).NotTo(HaveOccurred()) + Expect(svc.Name).To(Equal("test-runtime-master-svc")) + Expect(svc.Spec.ClusterIP).To(Equal("None")) + Expect(svc.Spec.PublishNotReadyAddresses).To(BeTrue()) + }) + + It("should not recreate if StatefulSet already exists", func() { + // First reconciliation + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + // Second reconciliation should not fail + err = manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should handle nil service gracefully", func() { + component.Service = nil + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + // Verify StatefulSet was created without ServiceName + sts := &appsv1.StatefulSet{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-master", + Namespace: "fluid", + }, sts) + Expect(err).NotTo(HaveOccurred()) + Expect(sts.Spec.ServiceName).To(BeEmpty()) + }) + }) + + Describe("ConstructComponentStatus", func() { + It("should return Ready phase when all replicas are ready", func() { + replicas := int32(3) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-master", + Namespace: "fluid", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 3, + CurrentReplicas: 3, + AvailableReplicas: 3, + }, + } + Expect(manager.client.Create(ctx, sts)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(3))) + Expect(status.CurrentReplicas).To(Equal(int32(3))) + Expect(status.AvailableReplicas).To(Equal(int32(3))) + Expect(status.UnavailableReplicas).To(Equal(int32(0))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseReady)) + }) + + It("should return NotReady phase when replicas are partially ready", func() { + replicas := int32(3) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-master", + Namespace: "fluid", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 2, + CurrentReplicas: 3, + AvailableReplicas: 2, + }, + } + Expect(manager.client.Create(ctx, sts)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(2))) + Expect(status.UnavailableReplicas).To(Equal(int32(1))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady)) + }) + + It("should return NotReady phase when no replicas are ready", func() { + replicas := int32(3) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-master", + Namespace: "fluid", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 0, + CurrentReplicas: 3, + AvailableReplicas: 0, + }, + } + Expect(manager.client.Create(ctx, sts)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.ReadyReplicas).To(Equal(int32(0))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady)) + }) + + It("should return error when StatefulSet doesn't exist", func() { + _, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).To(HaveOccurred()) + }) + }) +}) + +var _ = Describe("DaemonSetManager", func() { + var ( + manager *DaemonSetManager + ctx context.Context + component *common.CacheRuntimeComponentValue + ) + + BeforeEach(func() { + scheme := runtime.NewScheme() + _ = appsv1.AddToScheme(scheme) + _ = corev1.AddToScheme(scheme) + client := fake.NewFakeClientWithScheme(scheme) + manager = newDaemonSetManager(client) + ctx = context.Background() + + component = &common.CacheRuntimeComponentValue{ + Name: "test-runtime-worker", + Namespace: "fluid", + Owner: &common.OwnerReference{ + APIVersion: "data.fluid.io/v1alpha1", + Kind: "CacheRuntime", + Name: "test-runtime", + UID: "test-uid", + }, + PodTemplateSpec: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "worker", + Image: "test-image:latest", + }, + }, + }, + }, + Service: &common.CacheRuntimeComponentServiceConfig{ + Name: "test-runtime-worker-svc", + }, + WorkloadType: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + }, + } + }) + + Describe("Reconciler", func() { + It("should create DaemonSet and Service successfully", func() { + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + ds := &appsv1.DaemonSet{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-worker", + Namespace: "fluid", + }, ds) + Expect(err).NotTo(HaveOccurred()) + Expect(ds.Name).To(Equal("test-runtime-worker")) + + svc := &corev1.Service{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-worker-svc", + Namespace: "fluid", + }, svc) + Expect(err).NotTo(HaveOccurred()) + Expect(svc.Name).To(Equal("test-runtime-worker-svc")) + Expect(svc.Spec.ClusterIP).To(Equal("None")) + Expect(svc.Spec.PublishNotReadyAddresses).To(BeTrue()) + }) + + It("should not recreate if DaemonSet already exists", func() { + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + err = manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should handle nil service gracefully", func() { + component.Service = nil + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("ConstructComponentStatus", func() { + It("should return correct status when all nodes are ready", func() { + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-worker", + Namespace: "fluid", + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: 3, + CurrentNumberScheduled: 3, + NumberAvailable: 3, + NumberUnavailable: 0, + NumberReady: 3, + }, + } + Expect(manager.client.Create(ctx, ds)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(3))) + Expect(status.AvailableReplicas).To(Equal(int32(3))) + Expect(status.UnavailableReplicas).To(Equal(int32(0))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseReady)) + }) + + It("should return Ready phase even when some nodes are unavailable", func() { + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-worker", + Namespace: "fluid", + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: 3, + CurrentNumberScheduled: 3, + NumberAvailable: 2, + NumberUnavailable: 1, + NumberReady: 2, + }, + } + Expect(manager.client.Create(ctx, ds)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(2))) + Expect(status.AvailableReplicas).To(Equal(int32(2))) + Expect(status.UnavailableReplicas).To(Equal(int32(1))) + // DaemonSet always returns Ready phase regardless of actual status + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseReady)) + }) + + It("should return error when DaemonSet doesn't exist", func() { + _, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).To(HaveOccurred()) + }) + }) +}) diff --git a/pkg/ddc/cache/component/daemonset_manager.go b/pkg/ddc/cache/component/daemonset_manager.go index e83aeaaaf05..d11aae95999 100644 --- a/pkg/ddc/cache/component/daemonset_manager.go +++ b/pkg/ddc/cache/component/daemonset_manager.go @@ -37,8 +37,8 @@ type DaemonSetManager struct { client client.Client } -func newDaemonSetManager(scheme *runtime.Scheme, client client.Client) *DaemonSetManager { - return &DaemonSetManager{scheme: scheme, client: client} +func newDaemonSetManager(client client.Client) *DaemonSetManager { + return &DaemonSetManager{client: client} } func (s *DaemonSetManager) Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error { diff --git a/pkg/ddc/cache/component/statefulset_manager.go b/pkg/ddc/cache/component/statefulset_manager.go index 2738b4e4838..f403b29d098 100644 --- a/pkg/ddc/cache/component/statefulset_manager.go +++ b/pkg/ddc/cache/component/statefulset_manager.go @@ -19,6 +19,7 @@ package component import ( "context" "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" @@ -26,19 +27,17 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) type StatefulSetManager struct { - scheme *runtime.Scheme client client.Client } -func newStatefulSetManager(scheme *runtime.Scheme, client client.Client) *StatefulSetManager { - return &StatefulSetManager{scheme: scheme, client: client} +func newStatefulSetManager(client client.Client) *StatefulSetManager { + return &StatefulSetManager{client: client} } func (s *StatefulSetManager) Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error { @@ -95,7 +94,6 @@ func (s *StatefulSetManager) constructStatefulSet(component *common.CacheRuntime }, }, Spec: appsv1.StatefulSetSpec{ - ServiceName: component.Service.Name, Replicas: &component.Replicas, Template: podTemplateSpec, PodManagementPolicy: appsv1.ParallelPodManagement, @@ -104,6 +102,12 @@ func (s *StatefulSetManager) constructStatefulSet(component *common.CacheRuntime }, }, } + + // Set ServiceName if service is configured + if component.Service != nil { + sts.Spec.ServiceName = component.Service.Name + } + return sts } func (s *StatefulSetManager) reconcileService(ctx context.Context, component *common.CacheRuntimeComponentValue) error { diff --git a/pkg/ddc/cache/engine/client.go b/pkg/ddc/cache/engine/client.go index f572c171e6b..82942af53ed 100644 --- a/pkg/ddc/cache/engine/client.go +++ b/pkg/ddc/cache/engine/client.go @@ -18,13 +18,14 @@ package engine import ( "context" + "reflect" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" - "reflect" ) func (e *CacheEngine) SetupClientComponent(clientValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -57,7 +58,7 @@ func (e *CacheEngine) ShouldSetupClient() (bool, error) { func (e *CacheEngine) SetupClientInternal(clientValue *common.CacheRuntimeComponentValue) error { // 1. reconcile to create client workload - manager := component.NewComponentHelper(clientValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(clientValue.WorkloadType, e.Client) err := manager.Reconciler(context.TODO(), clientValue) if err != nil { return err diff --git a/pkg/ddc/cache/engine/engine.go b/pkg/ddc/cache/engine/engine.go index b861c49e5be..8d818f25284 100644 --- a/pkg/ddc/cache/engine/engine.go +++ b/pkg/ddc/cache/engine/engine.go @@ -21,10 +21,10 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/utils" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,10 +45,10 @@ var _ base.Engine = (*CacheEngine)(nil) // and use `physical` dataset/runtime to represent the dataset/runtime is mounted by virtual dataset. type CacheEngine struct { client.Client - Scheme *runtime.Scheme Log logr.Logger Recorder record.EventRecorder + Scheme *runtime.Scheme Id string name string @@ -73,17 +73,21 @@ func (e *CacheEngine) ID() string { } func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error) { + // Get scheme from client + scheme := ctx.Client.Scheme() + engine := &CacheEngine{ + Client: ctx.Client, + Log: ctx.Log, + Recorder: ctx.Recorder, + Scheme: scheme, Id: id, name: ctx.Name, namespace: ctx.Namespace, + retryShutdown: 0, + gracefulShutdownLimits: 5, runtimeType: ctx.RuntimeType, engineImpl: ctx.EngineImpl, - Client: ctx.Client, - Recorder: ctx.Recorder, - Log: ctx.Log, - gracefulShutdownLimits: 5, - retryShutdown: 0, } engine.Log = ctx.Log.WithValues("cache engine", ctx.RuntimeType).WithValues("id", id) diff --git a/pkg/ddc/cache/engine/master.go b/pkg/ddc/cache/engine/master.go index 6c1996d72e2..06ebd754e86 100644 --- a/pkg/ddc/cache/engine/master.go +++ b/pkg/ddc/cache/engine/master.go @@ -19,13 +19,14 @@ package engine import ( "context" "errors" + "reflect" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" - "reflect" ) func (e *CacheEngine) SetupMasterComponent(masterValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -57,7 +58,7 @@ func (e *CacheEngine) shouldSetupMaster() (bool, error) { } func (e *CacheEngine) setupMasterInternal(masterValue *common.CacheRuntimeComponentValue) error { - manager := component.NewComponentHelper(masterValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(masterValue.WorkloadType, e.Client) err := manager.Reconciler(context.TODO(), masterValue) if err != nil { return err diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index d745a20cb01..0b71e8327a3 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -19,18 +19,19 @@ package engine import ( "context" "fmt" + "reflect" + "time" + fluidapi "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" - "reflect" - "time" ) func (e *CacheEngine) setMasterComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (ready bool, err error) { - manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Client) masterStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) if err != nil { @@ -47,7 +48,7 @@ func (e *CacheEngine) setMasterComponentStatus(componentValue *common.CacheRunti return ready, err } func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (ready bool, err error) { - manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Client) workerStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) if err != nil { @@ -73,7 +74,7 @@ func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRunti return ready, err } func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (err error) { - manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Client) clientStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) if err != nil { diff --git a/pkg/ddc/cache/engine/worker.go b/pkg/ddc/cache/engine/worker.go index b9ef2f881fa..e9d9e667ac5 100644 --- a/pkg/ddc/cache/engine/worker.go +++ b/pkg/ddc/cache/engine/worker.go @@ -18,13 +18,14 @@ package engine import ( "context" + "reflect" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" - "reflect" ) func (e *CacheEngine) SetupWorkerComponent(workerValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -55,7 +56,7 @@ func (e *CacheEngine) ShouldSetupWorker() (bool, error) { } func (e *CacheEngine) SetupWorkerInternal(workerValue *common.CacheRuntimeComponentValue) error { - manager := component.NewComponentHelper(workerValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(workerValue.WorkloadType, e.Client) err := manager.Reconciler(context.TODO(), workerValue) if err != nil { return err From adad79b3cfd7e3330c9f1c92db67a168e814d119 Mon Sep 17 00:00:00 2001 From: xliuqq Date: Mon, 27 Apr 2026 09:15:00 +0800 Subject: [PATCH 2/4] fix fmt Signed-off-by: xliuqq --- pkg/ddc/cache/component/daemonset_manager.go | 2 -- pkg/ddc/cache/component/statefulset_manager.go | 4 ++-- pkg/ddc/cache/engine/engine.go | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/ddc/cache/component/daemonset_manager.go b/pkg/ddc/cache/component/daemonset_manager.go index d11aae95999..c5ce62a09e2 100644 --- a/pkg/ddc/cache/component/daemonset_manager.go +++ b/pkg/ddc/cache/component/daemonset_manager.go @@ -26,14 +26,12 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) type DaemonSetManager struct { - scheme *runtime.Scheme client client.Client } diff --git a/pkg/ddc/cache/component/statefulset_manager.go b/pkg/ddc/cache/component/statefulset_manager.go index f403b29d098..fbb348ac0e7 100644 --- a/pkg/ddc/cache/component/statefulset_manager.go +++ b/pkg/ddc/cache/component/statefulset_manager.go @@ -102,12 +102,12 @@ func (s *StatefulSetManager) constructStatefulSet(component *common.CacheRuntime }, }, } - + // Set ServiceName if service is configured if component.Service != nil { sts.Spec.ServiceName = component.Service.Name } - + return sts } func (s *StatefulSetManager) reconcileService(ctx context.Context, component *common.CacheRuntimeComponentValue) error { diff --git a/pkg/ddc/cache/engine/engine.go b/pkg/ddc/cache/engine/engine.go index 8d818f25284..5f3b98ad6a8 100644 --- a/pkg/ddc/cache/engine/engine.go +++ b/pkg/ddc/cache/engine/engine.go @@ -75,7 +75,7 @@ func (e *CacheEngine) ID() string { func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error) { // Get scheme from client scheme := ctx.Client.Scheme() - + engine := &CacheEngine{ Client: ctx.Client, Log: ctx.Log, From 2dc2382dd27f96e3574ab7d45dcffb1f6098a576 Mon Sep 17 00:00:00 2001 From: xliuqq Date: Mon, 27 Apr 2026 21:11:38 +0800 Subject: [PATCH 3/4] fix review issues Signed-off-by: xliuqq --- pkg/ddc/cache/component/component_test.go | 48 ++++++++--- pkg/ddc/cache/component/daemonset_manager.go | 77 ++++------------- pkg/ddc/cache/component/service.go | 85 +++++++++++++++++++ .../cache/component/statefulset_manager.go | 63 ++------------ pkg/ddc/cache/engine/engine.go | 6 -- 5 files changed, 146 insertions(+), 133 deletions(-) create mode 100644 pkg/ddc/cache/component/service.go diff --git a/pkg/ddc/cache/component/component_test.go b/pkg/ddc/cache/component/component_test.go index b3fa237276a..f26089e53f5 100644 --- a/pkg/ddc/cache/component/component_test.go +++ b/pkg/ddc/cache/component/component_test.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) var _ = Describe("ComponentManager", func() { @@ -109,6 +110,14 @@ var _ = Describe("ComponentManager", func() { }) }) +// setupTestClient creates a fake client with the necessary schemes registered +func setupTestClient() client.Client { + scheme := runtime.NewScheme() + _ = appsv1.AddToScheme(scheme) + _ = corev1.AddToScheme(scheme) + return fake.NewFakeClientWithScheme(scheme) +} + var _ = Describe("StatefulSetManager", func() { var ( manager *StatefulSetManager @@ -117,10 +126,7 @@ var _ = Describe("StatefulSetManager", func() { ) BeforeEach(func() { - scheme := runtime.NewScheme() - _ = appsv1.AddToScheme(scheme) - _ = corev1.AddToScheme(scheme) - client := fake.NewFakeClientWithScheme(scheme) + client := setupTestClient() manager = newStatefulSetManager(client) ctx = context.Background() @@ -302,10 +308,7 @@ var _ = Describe("DaemonSetManager", func() { ) BeforeEach(func() { - scheme := runtime.NewScheme() - _ = appsv1.AddToScheme(scheme) - _ = corev1.AddToScheme(scheme) - client := fake.NewFakeClientWithScheme(scheme) + client := setupTestClient() manager = newDaemonSetManager(client) ctx = context.Background() @@ -425,8 +428,33 @@ var _ = Describe("DaemonSetManager", func() { Expect(status.ReadyReplicas).To(Equal(int32(2))) Expect(status.AvailableReplicas).To(Equal(int32(2))) Expect(status.UnavailableReplicas).To(Equal(int32(1))) - // DaemonSet always returns Ready phase regardless of actual status - Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseReady)) + // DaemonSet should return NotReady when not all replicas are ready + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady)) + }) + + It("should return NotReady phase when no nodes are ready", func() { + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-worker", + Namespace: "fluid", + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: 3, + CurrentNumberScheduled: 3, + NumberAvailable: 0, + NumberUnavailable: 3, + NumberReady: 0, + }, + } + Expect(manager.client.Create(ctx, ds)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(0))) + Expect(status.AvailableReplicas).To(Equal(int32(0))) + Expect(status.UnavailableReplicas).To(Equal(int32(3))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady)) }) It("should return error when DaemonSet doesn't exist", func() { diff --git a/pkg/ddc/cache/component/daemonset_manager.go b/pkg/ddc/cache/component/daemonset_manager.go index c5ce62a09e2..9ce5d9e25f4 100644 --- a/pkg/ddc/cache/component/daemonset_manager.go +++ b/pkg/ddc/cache/component/daemonset_manager.go @@ -19,11 +19,11 @@ package component import ( "context" "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -44,12 +44,12 @@ func (s *DaemonSetManager) Reconciler(ctx context.Context, component *common.Cac return err } - return s.reconcileService(ctx, component) + return reconcileService(ctx, s.client, component) } func (s *DaemonSetManager) reconcileDaemonSet(ctx context.Context, component *common.CacheRuntimeComponentValue) error { logger := log.FromContext(ctx) - logger.Info("start to reconciling dst workload") + logger.Info("start to reconciling ds workload") ds := &appsv1.DaemonSet{} err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, ds) @@ -60,13 +60,13 @@ func (s *DaemonSetManager) reconcileDaemonSet(ctx context.Context, component *co if err == nil { return nil } - // create the stateful set + // create the daemonset ds = s.constructDaemonSet(component) err = s.client.Create(ctx, ds) if err != nil { return err } - logger.Info("create sts workload succeed") + logger.Info("create ds workload succeed") return nil } func (s *DaemonSetManager) constructDaemonSet(component *common.CacheRuntimeComponentValue) *appsv1.DaemonSet { @@ -101,59 +101,6 @@ func (s *DaemonSetManager) constructDaemonSet(component *common.CacheRuntimeComp } return ds } -func (s *DaemonSetManager) reconcileService(ctx context.Context, component *common.CacheRuntimeComponentValue) error { - if component.Service == nil { - return nil - } - logger := log.FromContext(ctx) - logger.Info("start to reconciling headless service") - - svc := &corev1.Service{} - err := s.client.Get(ctx, types.NamespacedName{Name: component.Service.Name, Namespace: component.Namespace}, svc) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - // return if already created - if err == nil { - return nil - } - svc = s.constructService(component) - err = s.client.Create(ctx, svc) - if err != nil { - return err - } - logger.Info("create headless service succeed") - return nil -} - -func (s *DaemonSetManager) constructService(component *common.CacheRuntimeComponentValue) *corev1.Service { - matchLabels := getCommonLabelsFromComponent(component) - - trueVar := true - svc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: component.Service.Name, - Namespace: component.Namespace, - Labels: matchLabels, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: component.Owner.APIVersion, - Kind: component.Owner.Kind, - Name: component.Owner.Name, - UID: types.UID(component.Owner.UID), - BlockOwnerDeletion: &trueVar, - Controller: &trueVar, - }, - }, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "None", - Selector: matchLabels, - PublishNotReadyAddresses: true, - }, - } - return svc -} func (s *DaemonSetManager) ConstructComponentStatus(ctx context.Context, component *common.CacheRuntimeComponentValue) (datav1alpha1.RuntimeComponentStatus, error) { logger := log.FromContext(ctx) @@ -166,12 +113,20 @@ func (s *DaemonSetManager) ConstructComponentStatus(ctx context.Context, compone return datav1alpha1.RuntimeComponentStatus{}, err } + desiredReplicas := ds.Status.DesiredNumberScheduled + readyReplicas := ds.Status.NumberReady + + runtimePhase := datav1alpha1.RuntimePhaseNotReady + if desiredReplicas == readyReplicas { + runtimePhase = datav1alpha1.RuntimePhaseReady + } + return datav1alpha1.RuntimeComponentStatus{ - Phase: datav1alpha1.RuntimePhaseReady, - DesiredReplicas: ds.Status.DesiredNumberScheduled, + Phase: runtimePhase, + DesiredReplicas: desiredReplicas, CurrentReplicas: ds.Status.CurrentNumberScheduled, AvailableReplicas: ds.Status.NumberAvailable, UnavailableReplicas: ds.Status.NumberUnavailable, - ReadyReplicas: ds.Status.NumberReady, + ReadyReplicas: readyReplicas, }, nil } diff --git a/pkg/ddc/cache/component/service.go b/pkg/ddc/cache/component/service.go new file mode 100644 index 00000000000..b445c2d0420 --- /dev/null +++ b/pkg/ddc/cache/component/service.go @@ -0,0 +1,85 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "context" + + "github.com/fluid-cloudnative/fluid/pkg/common" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// reconcileService reconciles the headless service for a component +func reconcileService(ctx context.Context, c client.Client, component *common.CacheRuntimeComponentValue) error { + if component.Service == nil { + return nil + } + logger := log.FromContext(ctx) + logger.Info("start to reconciling headless service") + + svc := &corev1.Service{} + err := c.Get(ctx, types.NamespacedName{Name: component.Service.Name, Namespace: component.Namespace}, svc) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + // return if already created + if err == nil { + return nil + } + svc = constructService(component) + err = c.Create(ctx, svc) + if err != nil { + return err + } + logger.Info("create headless service succeed") + return nil +} + +// constructService constructs a headless service for a component +func constructService(component *common.CacheRuntimeComponentValue) *corev1.Service { + matchLabels := getCommonLabelsFromComponent(component) + + trueVar := true + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: component.Service.Name, + Namespace: component.Namespace, + Labels: matchLabels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: component.Owner.APIVersion, + Kind: component.Owner.Kind, + Name: component.Owner.Name, + UID: types.UID(component.Owner.UID), + BlockOwnerDeletion: &trueVar, + Controller: &trueVar, + }, + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + Selector: matchLabels, + PublishNotReadyAddresses: true, + }, + } + return svc +} diff --git a/pkg/ddc/cache/component/statefulset_manager.go b/pkg/ddc/cache/component/statefulset_manager.go index fbb348ac0e7..01fb38f3eed 100644 --- a/pkg/ddc/cache/component/statefulset_manager.go +++ b/pkg/ddc/cache/component/statefulset_manager.go @@ -24,7 +24,6 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -45,7 +44,7 @@ func (s *StatefulSetManager) Reconciler(ctx context.Context, component *common.C return err } - return s.reconcileService(ctx, component) + return reconcileService(ctx, s.client, component) } func (s *StatefulSetManager) reconcileStatefulSet(ctx context.Context, component *common.CacheRuntimeComponentValue) error { @@ -110,59 +109,6 @@ func (s *StatefulSetManager) constructStatefulSet(component *common.CacheRuntime return sts } -func (s *StatefulSetManager) reconcileService(ctx context.Context, component *common.CacheRuntimeComponentValue) error { - if component.Service == nil { - return nil - } - logger := log.FromContext(ctx) - logger.Info("start to reconciling headless service") - - svc := &corev1.Service{} - err := s.client.Get(ctx, types.NamespacedName{Name: component.Service.Name, Namespace: component.Namespace}, svc) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - // return if already created - if err == nil { - return nil - } - svc = s.constructService(component) - err = s.client.Create(ctx, svc) - if err != nil { - return err - } - logger.Info("create headless service succeed") - return nil -} - -func (s *StatefulSetManager) constructService(component *common.CacheRuntimeComponentValue) *corev1.Service { - matchLabels := getCommonLabelsFromComponent(component) - - trueVar := true - svc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: component.Service.Name, - Namespace: component.Namespace, - Labels: matchLabels, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: component.Owner.APIVersion, - Kind: component.Owner.Kind, - Name: component.Owner.Name, - UID: types.UID(component.Owner.UID), - BlockOwnerDeletion: &trueVar, - Controller: &trueVar, - }, - }, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "None", - Selector: matchLabels, - PublishNotReadyAddresses: true, - }, - } - return svc -} func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, component *common.CacheRuntimeComponentValue) (datav1alpha1.RuntimeComponentStatus, error) { logger := log.FromContext(ctx) @@ -183,12 +129,17 @@ func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, compo runtimePhase = datav1alpha1.RuntimePhaseReady } + unavailableReplicas := sts.Status.CurrentReplicas - sts.Status.AvailableReplicas + if unavailableReplicas < 0 { + unavailableReplicas = 0 + } + return datav1alpha1.RuntimeComponentStatus{ Phase: runtimePhase, DesiredReplicas: desiredReplicas, CurrentReplicas: sts.Status.CurrentReplicas, AvailableReplicas: sts.Status.AvailableReplicas, - UnavailableReplicas: sts.Status.CurrentReplicas - sts.Status.AvailableReplicas, + UnavailableReplicas: unavailableReplicas, ReadyReplicas: readyReplicas, }, nil } diff --git a/pkg/ddc/cache/engine/engine.go b/pkg/ddc/cache/engine/engine.go index 5f3b98ad6a8..c81314b2c01 100644 --- a/pkg/ddc/cache/engine/engine.go +++ b/pkg/ddc/cache/engine/engine.go @@ -21,7 +21,6 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/utils" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "github.com/go-logr/logr" @@ -48,7 +47,6 @@ type CacheEngine struct { Log logr.Logger Recorder record.EventRecorder - Scheme *runtime.Scheme Id string name string @@ -73,14 +71,10 @@ func (e *CacheEngine) ID() string { } func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error) { - // Get scheme from client - scheme := ctx.Client.Scheme() - engine := &CacheEngine{ Client: ctx.Client, Log: ctx.Log, Recorder: ctx.Recorder, - Scheme: scheme, Id: id, name: ctx.Name, namespace: ctx.Namespace, From 68c1e81d3dfbb8b250bdf0a6003b0c3eef080858 Mon Sep 17 00:00:00 2001 From: xliuqq Date: Tue, 28 Apr 2026 20:58:18 +0800 Subject: [PATCH 4/4] fix test names Signed-off-by: xliuqq --- pkg/ddc/cache/component/component_test.go | 2 +- pkg/ddc/cache/component/statefulset_manager.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/ddc/cache/component/component_test.go b/pkg/ddc/cache/component/component_test.go index f26089e53f5..6b666e31ba9 100644 --- a/pkg/ddc/cache/component/component_test.go +++ b/pkg/ddc/cache/component/component_test.go @@ -406,7 +406,7 @@ var _ = Describe("DaemonSetManager", func() { Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseReady)) }) - It("should return Ready phase even when some nodes are unavailable", func() { + It("should return NotReady phase when not all nodes are ready", func() { ds := &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-runtime-worker", diff --git a/pkg/ddc/cache/component/statefulset_manager.go b/pkg/ddc/cache/component/statefulset_manager.go index 01fb38f3eed..9bfa1cfce61 100644 --- a/pkg/ddc/cache/component/statefulset_manager.go +++ b/pkg/ddc/cache/component/statefulset_manager.go @@ -129,6 +129,7 @@ func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, compo runtimePhase = datav1alpha1.RuntimePhaseReady } + // AvailableReplicas can be greater than CurrentReplicas (Kubernetes API allows this) unavailableReplicas := sts.Status.CurrentReplicas - sts.Status.AvailableReplicas if unavailableReplicas < 0 { unavailableReplicas = 0