diff --git a/pkg/ddc/cache/component/component_manager.go b/pkg/ddc/cache/component/component_manager.go index 77282ce730d..958b2fb651c 100644 --- a/pkg/ddc/cache/component/component_manager.go +++ b/pkg/ddc/cache/component/component_manager.go @@ -18,10 +18,10 @@ package component import ( "context" + "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -30,16 +30,16 @@ type ComponentManager interface { ConstructComponentStatus(todo context.Context, value *common.CacheRuntimeComponentValue) (v1alpha1.RuntimeComponentStatus, error) } -func NewComponentHelper(workloadType metav1.TypeMeta, scheme *runtime.Scheme, client client.Client) ComponentManager { +func NewComponentHelper(workloadType metav1.TypeMeta, client client.Client) ComponentManager { if workloadType.APIVersion == "apps/v1" { if workloadType.Kind == "StatefulSet" { - return newStatefulSetManager(scheme, client) + return newStatefulSetManager(client) } else if workloadType.Kind == "DaemonSet" { - return newDaemonSetManager(scheme, client) + return newDaemonSetManager(client) } } - return newStatefulSetManager(scheme, client) + return newStatefulSetManager(client) } // getCommonLabelsFromComponent returns the common labels for component used for stateful diff --git a/pkg/ddc/cache/component/component_suite_test.go b/pkg/ddc/cache/component/component_suite_test.go new file mode 100644 index 00000000000..f32d30c6dd4 --- /dev/null +++ b/pkg/ddc/cache/component/component_suite_test.go @@ -0,0 +1,29 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCacheComponent(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Cache Component Suite", Label("cache-component")) +} diff --git a/pkg/ddc/cache/component/component_test.go b/pkg/ddc/cache/component/component_test.go new file mode 100644 index 00000000000..6b666e31ba9 --- /dev/null +++ b/pkg/ddc/cache/component/component_test.go @@ -0,0 +1,465 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "context" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("ComponentManager", func() { + Describe("NewComponentHelper", func() { + It("should return StatefulSetManager for StatefulSet workload", func() { + workloadType := metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + } + scheme := runtime.NewScheme() + client := fake.NewFakeClientWithScheme(scheme) + + manager := NewComponentHelper(workloadType, client) + Expect(manager).NotTo(BeNil()) + _, ok := manager.(*StatefulSetManager) + Expect(ok).To(BeTrue()) + }) + + It("should return DaemonSetManager for DaemonSet workload", func() { + workloadType := metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + } + scheme := runtime.NewScheme() + client := fake.NewFakeClientWithScheme(scheme) + + manager := NewComponentHelper(workloadType, client) + Expect(manager).NotTo(BeNil()) + _, ok := manager.(*DaemonSetManager) + Expect(ok).To(BeTrue()) + }) + + It("should return StatefulSetManager as default for unknown kind", func() { + workloadType := metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Unknown", + } + scheme := runtime.NewScheme() + client := fake.NewFakeClientWithScheme(scheme) + + manager := NewComponentHelper(workloadType, client) + Expect(manager).NotTo(BeNil()) + _, ok := manager.(*StatefulSetManager) + Expect(ok).To(BeTrue()) + }) + + It("should return StatefulSetManager as default for wrong APIVersion", func() { + workloadType := metav1.TypeMeta{ + APIVersion: "v1", + Kind: "StatefulSet", + } + scheme := runtime.NewScheme() + client := fake.NewFakeClientWithScheme(scheme) + + manager := NewComponentHelper(workloadType, client) + Expect(manager).NotTo(BeNil()) + _, ok := manager.(*StatefulSetManager) + Expect(ok).To(BeTrue()) + }) + }) + + Describe("getCommonLabelsFromComponent", func() { + It("should generate correct labels with runtime name and component name", func() { + component := &common.CacheRuntimeComponentValue{ + Name: "test-runtime-master", + Owner: &common.OwnerReference{ + Name: "test-runtime", + }, + } + + labels := getCommonLabelsFromComponent(component) + Expect(labels).To(HaveKey(common.LabelCacheRuntimeName)) + Expect(labels[common.LabelCacheRuntimeName]).To(Equal("test-runtime")) + Expect(labels).To(HaveKey(common.LabelCacheRuntimeComponentName)) + Expect(labels[common.LabelCacheRuntimeComponentName]).To(Equal("test-runtime-master")) + Expect(len(labels)).To(Equal(2)) + }) + }) +}) + +// setupTestClient creates a fake client with the necessary schemes registered +func setupTestClient() client.Client { + scheme := runtime.NewScheme() + _ = appsv1.AddToScheme(scheme) + _ = corev1.AddToScheme(scheme) + return fake.NewFakeClientWithScheme(scheme) +} + +var _ = Describe("StatefulSetManager", func() { + var ( + manager *StatefulSetManager + ctx context.Context + component *common.CacheRuntimeComponentValue + ) + + BeforeEach(func() { + client := setupTestClient() + manager = newStatefulSetManager(client) + ctx = context.Background() + + replicas := int32(3) + component = &common.CacheRuntimeComponentValue{ + Name: "test-runtime-master", + Namespace: "fluid", + Replicas: replicas, + Owner: &common.OwnerReference{ + APIVersion: "data.fluid.io/v1alpha1", + Kind: "CacheRuntime", + Name: "test-runtime", + UID: "test-uid", + }, + PodTemplateSpec: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "master", + Image: "test-image:latest", + }, + }, + }, + }, + Service: &common.CacheRuntimeComponentServiceConfig{ + Name: "test-runtime-master-svc", + }, + WorkloadType: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + }, + } + }) + + Describe("Reconciler", func() { + It("should create StatefulSet and Service successfully", func() { + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + sts := &appsv1.StatefulSet{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-master", + Namespace: "fluid", + }, sts) + Expect(err).NotTo(HaveOccurred()) + Expect(sts.Name).To(Equal("test-runtime-master")) + Expect(*sts.Spec.Replicas).To(Equal(int32(3))) + Expect(sts.Spec.PodManagementPolicy).To(Equal(appsv1.ParallelPodManagement)) + Expect(sts.Spec.ServiceName).To(Equal("test-runtime-master-svc")) + + svc := &corev1.Service{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-master-svc", + Namespace: "fluid", + }, svc) + Expect(err).NotTo(HaveOccurred()) + Expect(svc.Name).To(Equal("test-runtime-master-svc")) + Expect(svc.Spec.ClusterIP).To(Equal("None")) + Expect(svc.Spec.PublishNotReadyAddresses).To(BeTrue()) + }) + + It("should not recreate if StatefulSet already exists", func() { + // First reconciliation + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + // Second reconciliation should not fail + err = manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should handle nil service gracefully", func() { + component.Service = nil + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + // Verify StatefulSet was created without ServiceName + sts := &appsv1.StatefulSet{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-master", + Namespace: "fluid", + }, sts) + Expect(err).NotTo(HaveOccurred()) + Expect(sts.Spec.ServiceName).To(BeEmpty()) + }) + }) + + Describe("ConstructComponentStatus", func() { + It("should return Ready phase when all replicas are ready", func() { + replicas := int32(3) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-master", + Namespace: "fluid", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 3, + CurrentReplicas: 3, + AvailableReplicas: 3, + }, + } + Expect(manager.client.Create(ctx, sts)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(3))) + Expect(status.CurrentReplicas).To(Equal(int32(3))) + Expect(status.AvailableReplicas).To(Equal(int32(3))) + Expect(status.UnavailableReplicas).To(Equal(int32(0))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseReady)) + }) + + It("should return NotReady phase when replicas are partially ready", func() { + replicas := int32(3) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-master", + Namespace: "fluid", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 2, + CurrentReplicas: 3, + AvailableReplicas: 2, + }, + } + Expect(manager.client.Create(ctx, sts)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(2))) + Expect(status.UnavailableReplicas).To(Equal(int32(1))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady)) + }) + + It("should return NotReady phase when no replicas are ready", func() { + replicas := int32(3) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-master", + Namespace: "fluid", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 0, + CurrentReplicas: 3, + AvailableReplicas: 0, + }, + } + Expect(manager.client.Create(ctx, sts)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.ReadyReplicas).To(Equal(int32(0))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady)) + }) + + It("should return error when StatefulSet doesn't exist", func() { + _, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).To(HaveOccurred()) + }) + }) +}) + +var _ = Describe("DaemonSetManager", func() { + var ( + manager *DaemonSetManager + ctx context.Context + component *common.CacheRuntimeComponentValue + ) + + BeforeEach(func() { + client := setupTestClient() + manager = newDaemonSetManager(client) + ctx = context.Background() + + component = &common.CacheRuntimeComponentValue{ + Name: "test-runtime-worker", + Namespace: "fluid", + Owner: &common.OwnerReference{ + APIVersion: "data.fluid.io/v1alpha1", + Kind: "CacheRuntime", + Name: "test-runtime", + UID: "test-uid", + }, + PodTemplateSpec: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "worker", + Image: "test-image:latest", + }, + }, + }, + }, + Service: &common.CacheRuntimeComponentServiceConfig{ + Name: "test-runtime-worker-svc", + }, + WorkloadType: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + }, + } + }) + + Describe("Reconciler", func() { + It("should create DaemonSet and Service successfully", func() { + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + ds := &appsv1.DaemonSet{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-worker", + Namespace: "fluid", + }, ds) + Expect(err).NotTo(HaveOccurred()) + Expect(ds.Name).To(Equal("test-runtime-worker")) + + svc := &corev1.Service{} + err = manager.client.Get(ctx, types.NamespacedName{ + Name: "test-runtime-worker-svc", + Namespace: "fluid", + }, svc) + Expect(err).NotTo(HaveOccurred()) + Expect(svc.Name).To(Equal("test-runtime-worker-svc")) + Expect(svc.Spec.ClusterIP).To(Equal("None")) + Expect(svc.Spec.PublishNotReadyAddresses).To(BeTrue()) + }) + + It("should not recreate if DaemonSet already exists", func() { + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + + err = manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should handle nil service gracefully", func() { + component.Service = nil + err := manager.Reconciler(ctx, component) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("ConstructComponentStatus", func() { + It("should return correct status when all nodes are ready", func() { + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-worker", + Namespace: "fluid", + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: 3, + CurrentNumberScheduled: 3, + NumberAvailable: 3, + NumberUnavailable: 0, + NumberReady: 3, + }, + } + Expect(manager.client.Create(ctx, ds)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(3))) + Expect(status.AvailableReplicas).To(Equal(int32(3))) + Expect(status.UnavailableReplicas).To(Equal(int32(0))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseReady)) + }) + + It("should return NotReady phase when not all nodes are ready", func() { + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-worker", + Namespace: "fluid", + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: 3, + CurrentNumberScheduled: 3, + NumberAvailable: 2, + NumberUnavailable: 1, + NumberReady: 2, + }, + } + Expect(manager.client.Create(ctx, ds)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(2))) + Expect(status.AvailableReplicas).To(Equal(int32(2))) + Expect(status.UnavailableReplicas).To(Equal(int32(1))) + // DaemonSet should return NotReady when not all replicas are ready + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady)) + }) + + It("should return NotReady phase when no nodes are ready", func() { + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-worker", + Namespace: "fluid", + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: 3, + CurrentNumberScheduled: 3, + NumberAvailable: 0, + NumberUnavailable: 3, + NumberReady: 0, + }, + } + Expect(manager.client.Create(ctx, ds)).To(Succeed()) + + status, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).NotTo(HaveOccurred()) + Expect(status.DesiredReplicas).To(Equal(int32(3))) + Expect(status.ReadyReplicas).To(Equal(int32(0))) + Expect(status.AvailableReplicas).To(Equal(int32(0))) + Expect(status.UnavailableReplicas).To(Equal(int32(3))) + Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady)) + }) + + It("should return error when DaemonSet doesn't exist", func() { + _, err := manager.ConstructComponentStatus(ctx, component) + Expect(err).To(HaveOccurred()) + }) + }) +}) diff --git a/pkg/ddc/cache/component/daemonset_manager.go b/pkg/ddc/cache/component/daemonset_manager.go index e83aeaaaf05..9ce5d9e25f4 100644 --- a/pkg/ddc/cache/component/daemonset_manager.go +++ b/pkg/ddc/cache/component/daemonset_manager.go @@ -19,26 +19,24 @@ package component import ( "context" "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) type DaemonSetManager struct { - scheme *runtime.Scheme client client.Client } -func newDaemonSetManager(scheme *runtime.Scheme, client client.Client) *DaemonSetManager { - return &DaemonSetManager{scheme: scheme, client: client} +func newDaemonSetManager(client client.Client) *DaemonSetManager { + return &DaemonSetManager{client: client} } func (s *DaemonSetManager) Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error { @@ -46,12 +44,12 @@ func (s *DaemonSetManager) Reconciler(ctx context.Context, component *common.Cac return err } - return s.reconcileService(ctx, component) + return reconcileService(ctx, s.client, component) } func (s *DaemonSetManager) reconcileDaemonSet(ctx context.Context, component *common.CacheRuntimeComponentValue) error { logger := log.FromContext(ctx) - logger.Info("start to reconciling dst workload") + logger.Info("start to reconciling ds workload") ds := &appsv1.DaemonSet{} err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, ds) @@ -62,13 +60,13 @@ func (s *DaemonSetManager) reconcileDaemonSet(ctx context.Context, component *co if err == nil { return nil } - // create the stateful set + // create the daemonset ds = s.constructDaemonSet(component) err = s.client.Create(ctx, ds) if err != nil { return err } - logger.Info("create sts workload succeed") + logger.Info("create ds workload succeed") return nil } func (s *DaemonSetManager) constructDaemonSet(component *common.CacheRuntimeComponentValue) *appsv1.DaemonSet { @@ -103,59 +101,6 @@ func (s *DaemonSetManager) constructDaemonSet(component *common.CacheRuntimeComp } return ds } -func (s *DaemonSetManager) reconcileService(ctx context.Context, component *common.CacheRuntimeComponentValue) error { - if component.Service == nil { - return nil - } - logger := log.FromContext(ctx) - logger.Info("start to reconciling headless service") - - svc := &corev1.Service{} - err := s.client.Get(ctx, types.NamespacedName{Name: component.Service.Name, Namespace: component.Namespace}, svc) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - // return if already created - if err == nil { - return nil - } - svc = s.constructService(component) - err = s.client.Create(ctx, svc) - if err != nil { - return err - } - logger.Info("create headless service succeed") - return nil -} - -func (s *DaemonSetManager) constructService(component *common.CacheRuntimeComponentValue) *corev1.Service { - matchLabels := getCommonLabelsFromComponent(component) - - trueVar := true - svc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: component.Service.Name, - Namespace: component.Namespace, - Labels: matchLabels, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: component.Owner.APIVersion, - Kind: component.Owner.Kind, - Name: component.Owner.Name, - UID: types.UID(component.Owner.UID), - BlockOwnerDeletion: &trueVar, - Controller: &trueVar, - }, - }, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "None", - Selector: matchLabels, - PublishNotReadyAddresses: true, - }, - } - return svc -} func (s *DaemonSetManager) ConstructComponentStatus(ctx context.Context, component *common.CacheRuntimeComponentValue) (datav1alpha1.RuntimeComponentStatus, error) { logger := log.FromContext(ctx) @@ -168,12 +113,20 @@ func (s *DaemonSetManager) ConstructComponentStatus(ctx context.Context, compone return datav1alpha1.RuntimeComponentStatus{}, err } + desiredReplicas := ds.Status.DesiredNumberScheduled + readyReplicas := ds.Status.NumberReady + + runtimePhase := datav1alpha1.RuntimePhaseNotReady + if desiredReplicas == readyReplicas { + runtimePhase = datav1alpha1.RuntimePhaseReady + } + return datav1alpha1.RuntimeComponentStatus{ - Phase: datav1alpha1.RuntimePhaseReady, - DesiredReplicas: ds.Status.DesiredNumberScheduled, + Phase: runtimePhase, + DesiredReplicas: desiredReplicas, CurrentReplicas: ds.Status.CurrentNumberScheduled, AvailableReplicas: ds.Status.NumberAvailable, UnavailableReplicas: ds.Status.NumberUnavailable, - ReadyReplicas: ds.Status.NumberReady, + ReadyReplicas: readyReplicas, }, nil } diff --git a/pkg/ddc/cache/component/service.go b/pkg/ddc/cache/component/service.go new file mode 100644 index 00000000000..b445c2d0420 --- /dev/null +++ b/pkg/ddc/cache/component/service.go @@ -0,0 +1,85 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "context" + + "github.com/fluid-cloudnative/fluid/pkg/common" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// reconcileService reconciles the headless service for a component +func reconcileService(ctx context.Context, c client.Client, component *common.CacheRuntimeComponentValue) error { + if component.Service == nil { + return nil + } + logger := log.FromContext(ctx) + logger.Info("start to reconciling headless service") + + svc := &corev1.Service{} + err := c.Get(ctx, types.NamespacedName{Name: component.Service.Name, Namespace: component.Namespace}, svc) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + // return if already created + if err == nil { + return nil + } + svc = constructService(component) + err = c.Create(ctx, svc) + if err != nil { + return err + } + logger.Info("create headless service succeed") + return nil +} + +// constructService constructs a headless service for a component +func constructService(component *common.CacheRuntimeComponentValue) *corev1.Service { + matchLabels := getCommonLabelsFromComponent(component) + + trueVar := true + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: component.Service.Name, + Namespace: component.Namespace, + Labels: matchLabels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: component.Owner.APIVersion, + Kind: component.Owner.Kind, + Name: component.Owner.Name, + UID: types.UID(component.Owner.UID), + BlockOwnerDeletion: &trueVar, + Controller: &trueVar, + }, + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + Selector: matchLabels, + PublishNotReadyAddresses: true, + }, + } + return svc +} diff --git a/pkg/ddc/cache/component/statefulset_manager.go b/pkg/ddc/cache/component/statefulset_manager.go index 2738b4e4838..9bfa1cfce61 100644 --- a/pkg/ddc/cache/component/statefulset_manager.go +++ b/pkg/ddc/cache/component/statefulset_manager.go @@ -19,26 +19,24 @@ package component import ( "context" "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) type StatefulSetManager struct { - scheme *runtime.Scheme client client.Client } -func newStatefulSetManager(scheme *runtime.Scheme, client client.Client) *StatefulSetManager { - return &StatefulSetManager{scheme: scheme, client: client} +func newStatefulSetManager(client client.Client) *StatefulSetManager { + return &StatefulSetManager{client: client} } func (s *StatefulSetManager) Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error { @@ -46,7 +44,7 @@ func (s *StatefulSetManager) Reconciler(ctx context.Context, component *common.C return err } - return s.reconcileService(ctx, component) + return reconcileService(ctx, s.client, component) } func (s *StatefulSetManager) reconcileStatefulSet(ctx context.Context, component *common.CacheRuntimeComponentValue) error { @@ -95,7 +93,6 @@ func (s *StatefulSetManager) constructStatefulSet(component *common.CacheRuntime }, }, Spec: appsv1.StatefulSetSpec{ - ServiceName: component.Service.Name, Replicas: &component.Replicas, Template: podTemplateSpec, PodManagementPolicy: appsv1.ParallelPodManagement, @@ -104,60 +101,13 @@ func (s *StatefulSetManager) constructStatefulSet(component *common.CacheRuntime }, }, } - return sts -} -func (s *StatefulSetManager) reconcileService(ctx context.Context, component *common.CacheRuntimeComponentValue) error { - if component.Service == nil { - return nil - } - logger := log.FromContext(ctx) - logger.Info("start to reconciling headless service") - svc := &corev1.Service{} - err := s.client.Get(ctx, types.NamespacedName{Name: component.Service.Name, Namespace: component.Namespace}, svc) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - // return if already created - if err == nil { - return nil - } - svc = s.constructService(component) - err = s.client.Create(ctx, svc) - if err != nil { - return err + // Set ServiceName if service is configured + if component.Service != nil { + sts.Spec.ServiceName = component.Service.Name } - logger.Info("create headless service succeed") - return nil -} -func (s *StatefulSetManager) constructService(component *common.CacheRuntimeComponentValue) *corev1.Service { - matchLabels := getCommonLabelsFromComponent(component) - - trueVar := true - svc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: component.Service.Name, - Namespace: component.Namespace, - Labels: matchLabels, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: component.Owner.APIVersion, - Kind: component.Owner.Kind, - Name: component.Owner.Name, - UID: types.UID(component.Owner.UID), - BlockOwnerDeletion: &trueVar, - Controller: &trueVar, - }, - }, - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "None", - Selector: matchLabels, - PublishNotReadyAddresses: true, - }, - } - return svc + return sts } func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, component *common.CacheRuntimeComponentValue) (datav1alpha1.RuntimeComponentStatus, error) { @@ -179,12 +129,18 @@ func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, compo runtimePhase = datav1alpha1.RuntimePhaseReady } + // AvailableReplicas can be greater than CurrentReplicas (Kubernetes API allows this) + unavailableReplicas := sts.Status.CurrentReplicas - sts.Status.AvailableReplicas + if unavailableReplicas < 0 { + unavailableReplicas = 0 + } + return datav1alpha1.RuntimeComponentStatus{ Phase: runtimePhase, DesiredReplicas: desiredReplicas, CurrentReplicas: sts.Status.CurrentReplicas, AvailableReplicas: sts.Status.AvailableReplicas, - UnavailableReplicas: sts.Status.CurrentReplicas - sts.Status.AvailableReplicas, + UnavailableReplicas: unavailableReplicas, ReadyReplicas: readyReplicas, }, nil } diff --git a/pkg/ddc/cache/engine/client.go b/pkg/ddc/cache/engine/client.go index f572c171e6b..82942af53ed 100644 --- a/pkg/ddc/cache/engine/client.go +++ b/pkg/ddc/cache/engine/client.go @@ -18,13 +18,14 @@ package engine import ( "context" + "reflect" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" - "reflect" ) func (e *CacheEngine) SetupClientComponent(clientValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -57,7 +58,7 @@ func (e *CacheEngine) ShouldSetupClient() (bool, error) { func (e *CacheEngine) SetupClientInternal(clientValue *common.CacheRuntimeComponentValue) error { // 1. reconcile to create client workload - manager := component.NewComponentHelper(clientValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(clientValue.WorkloadType, e.Client) err := manager.Reconciler(context.TODO(), clientValue) if err != nil { return err diff --git a/pkg/ddc/cache/engine/engine.go b/pkg/ddc/cache/engine/engine.go index b861c49e5be..c81314b2c01 100644 --- a/pkg/ddc/cache/engine/engine.go +++ b/pkg/ddc/cache/engine/engine.go @@ -24,7 +24,6 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,7 +44,6 @@ var _ base.Engine = (*CacheEngine)(nil) // and use `physical` dataset/runtime to represent the dataset/runtime is mounted by virtual dataset. type CacheEngine struct { client.Client - Scheme *runtime.Scheme Log logr.Logger Recorder record.EventRecorder @@ -74,16 +72,16 @@ func (e *CacheEngine) ID() string { func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error) { engine := &CacheEngine{ + Client: ctx.Client, + Log: ctx.Log, + Recorder: ctx.Recorder, Id: id, name: ctx.Name, namespace: ctx.Namespace, + retryShutdown: 0, + gracefulShutdownLimits: 5, runtimeType: ctx.RuntimeType, engineImpl: ctx.EngineImpl, - Client: ctx.Client, - Recorder: ctx.Recorder, - Log: ctx.Log, - gracefulShutdownLimits: 5, - retryShutdown: 0, } engine.Log = ctx.Log.WithValues("cache engine", ctx.RuntimeType).WithValues("id", id) diff --git a/pkg/ddc/cache/engine/master.go b/pkg/ddc/cache/engine/master.go index 6c1996d72e2..06ebd754e86 100644 --- a/pkg/ddc/cache/engine/master.go +++ b/pkg/ddc/cache/engine/master.go @@ -19,13 +19,14 @@ package engine import ( "context" "errors" + "reflect" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" - "reflect" ) func (e *CacheEngine) SetupMasterComponent(masterValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -57,7 +58,7 @@ func (e *CacheEngine) shouldSetupMaster() (bool, error) { } func (e *CacheEngine) setupMasterInternal(masterValue *common.CacheRuntimeComponentValue) error { - manager := component.NewComponentHelper(masterValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(masterValue.WorkloadType, e.Client) err := manager.Reconciler(context.TODO(), masterValue) if err != nil { return err diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index d745a20cb01..0b71e8327a3 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -19,18 +19,19 @@ package engine import ( "context" "fmt" + "reflect" + "time" + fluidapi "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" - "reflect" - "time" ) func (e *CacheEngine) setMasterComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (ready bool, err error) { - manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Client) masterStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) if err != nil { @@ -47,7 +48,7 @@ func (e *CacheEngine) setMasterComponentStatus(componentValue *common.CacheRunti return ready, err } func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (ready bool, err error) { - manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Client) workerStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) if err != nil { @@ -73,7 +74,7 @@ func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRunti return ready, err } func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (err error) { - manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Client) clientStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) if err != nil { diff --git a/pkg/ddc/cache/engine/worker.go b/pkg/ddc/cache/engine/worker.go index b9ef2f881fa..e9d9e667ac5 100644 --- a/pkg/ddc/cache/engine/worker.go +++ b/pkg/ddc/cache/engine/worker.go @@ -18,13 +18,14 @@ package engine import ( "context" + "reflect" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" - "reflect" ) func (e *CacheEngine) SetupWorkerComponent(workerValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -55,7 +56,7 @@ func (e *CacheEngine) ShouldSetupWorker() (bool, error) { } func (e *CacheEngine) SetupWorkerInternal(workerValue *common.CacheRuntimeComponentValue) error { - manager := component.NewComponentHelper(workerValue.WorkloadType, e.Scheme, e.Client) + manager := component.NewComponentHelper(workerValue.WorkloadType, e.Client) err := manager.Reconciler(context.TODO(), workerValue) if err != nil { return err