From ec72ffef39eeb2fb359117c471cb687923e08231 Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Thu, 16 Apr 2026 23:06:52 +0800 Subject: [PATCH 01/12] fix: wait for cache client readiness before binding dataset Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/status.go | 23 ++-- pkg/ddc/cache/engine/status_test.go | 168 ++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 8 deletions(-) create mode 100644 pkg/ddc/cache/engine/status_test.go diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index d745a20cb01..13d6f28bfc6 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -72,26 +72,33 @@ func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRunti return ready, err } -func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (err error) { +func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (ready bool, err error) { manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) clientStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) if err != nil { - return err + return false, err } - if clientStatus.DesiredReplicas > 0 { + if clientStatus.DesiredReplicas == 0 { + clientStatus.Phase = fluidapi.RuntimePhaseReady + ready = true + } else if clientStatus.ReadyReplicas > 0 { if clientStatus.DesiredReplicas == clientStatus.ReadyReplicas { clientStatus.Phase = fluidapi.RuntimePhaseReady + ready = true } else if clientStatus.ReadyReplicas >= 1 { clientStatus.Phase = fluidapi.RuntimePhasePartialReady + ready = true } + } else { + clientStatus.Phase = fluidapi.RuntimePhaseNotReady } status.Client = clientStatus - return nil + return ready, nil } func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValue) (bool, error) { - var masterReady, workerReady, runtimeReady = true, true, false + var masterReady, workerReady, clientReady, runtimeReady = true, true, true, false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { runtime, err := e.getRuntime() @@ -115,16 +122,16 @@ func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValu } if value.Client.Enabled { - err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status) + clientReady, err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status) if err != nil { return err } } - if masterReady && workerReady { + if masterReady && workerReady && clientReady { runtimeReady = true } else { - e.Log.Info(fmt.Sprintf("MasterReady: %v, workerReady: %v", masterReady, workerReady)) + e.Log.Info(fmt.Sprintf("MasterReady: %v, workerReady: %v, clientReady: %v", masterReady, workerReady, clientReady)) } // Update the setup time diff --git a/pkg/ddc/cache/engine/status_test.go b/pkg/ddc/cache/engine/status_test.go new file mode 100644 index 00000000000..c59fcef60b1 --- /dev/null +++ b/pkg/ddc/cache/engine/status_test.go @@ -0,0 +1,168 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package engine + +import ( + "context" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +func TestCheckAndUpdateRuntimeStatusRequiresClientReady(t *testing.T) { + const ( + namespace = "default" + runtimeName = "curvine-demo" + masterName = "curvine-demo-master" + workerName = "curvine-demo-worker" + clientName = "curvine-demo-client" + ) + + runtime := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: runtimeName, + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)), + }, + } + + client := fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + runtime, + newStatefulSetComponent(masterName, namespace, 1, 1), + newStatefulSetComponent(workerName, namespace, 1, 1), + newDaemonSetComponent(clientName, namespace, 1, 0), + ) + + engine := &CacheEngine{ + Client: client, + Scheme: datav1alpha1.UnitTestScheme, + name: runtimeName, + namespace: namespace, + Log: fake.NullLogger(), + } + + value := &common.CacheRuntimeValue{ + Master: &common.CacheRuntimeComponentValue{ + Enabled: true, + Name: masterName, + Namespace: namespace, + WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "StatefulSet"}, + }, + Worker: &common.CacheRuntimeComponentValue{ + Enabled: true, + Name: workerName, + Namespace: namespace, + WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "StatefulSet"}, + }, + Client: &common.CacheRuntimeComponentValue{ + Enabled: true, + Name: clientName, + Namespace: namespace, + WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "DaemonSet"}, + }, + } + + ready, err := engine.CheckAndUpdateRuntimeStatus(value) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if ready { + t.Fatalf("expected runtime to stay not ready while client is not ready") + } + + updatedRuntime := &datav1alpha1.CacheRuntime{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: runtimeName, Namespace: namespace}, updatedRuntime); err != nil { + t.Fatalf("failed to get runtime after first status update: %v", err) + } + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseNotReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Client.Phase) + } + if updatedRuntime.Status.SetupDuration != "" { + t.Fatalf("expected setup duration to stay empty before runtime is ready, got %q", updatedRuntime.Status.SetupDuration) + } + + clientDaemonSet := &appsv1.DaemonSet{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: clientName, Namespace: namespace}, clientDaemonSet); err != nil { + t.Fatalf("failed to get client daemonset: %v", err) + } + clientDaemonSet.Status.NumberReady = 1 + clientDaemonSet.Status.NumberAvailable = 1 + clientDaemonSet.Status.NumberUnavailable = 0 + if err := client.Status().Update(context.TODO(), clientDaemonSet); err != nil { + t.Fatalf("failed to update client daemonset status: %v", err) + } + + ready, err = engine.CheckAndUpdateRuntimeStatus(value) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if !ready { + t.Fatalf("expected runtime to become ready once client is ready") + } + + if err := client.Get(context.TODO(), types.NamespacedName{Name: runtimeName, Namespace: namespace}, updatedRuntime); err != nil { + t.Fatalf("failed to get runtime after second status update: %v", err) + } + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseReady, updatedRuntime.Status.Client.Phase) + } + if updatedRuntime.Status.SetupDuration == "" { + t.Fatalf("expected setup duration to be recorded once runtime is ready") + } +} + +func newStatefulSetComponent(name, namespace string, desiredReplicas, readyReplicas int32) *appsv1.StatefulSet { + replicas := desiredReplicas + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + CurrentReplicas: desiredReplicas, + AvailableReplicas: readyReplicas, + ReadyReplicas: readyReplicas, + }, + } +} + +func newDaemonSetComponent(name, namespace string, desiredReplicas, readyReplicas int32) *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Status: appsv1.DaemonSetStatus{ + CurrentNumberScheduled: desiredReplicas, + DesiredNumberScheduled: desiredReplicas, + NumberAvailable: readyReplicas, + NumberReady: readyReplicas, + NumberUnavailable: desiredReplicas - readyReplicas, + }, + } +} From a660717332afd3100a8888d0e9fe29e19a7da4aa Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Fri, 17 Apr 2026 17:57:02 +0800 Subject: [PATCH 02/12] fix: require full cache client readiness before binding Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/status.go | 1 - pkg/ddc/cache/engine/status_test.go | 103 ++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index 13d6f28bfc6..2afa0a24081 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -88,7 +88,6 @@ func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRunti ready = true } else if clientStatus.ReadyReplicas >= 1 { clientStatus.Phase = fluidapi.RuntimePhasePartialReady - ready = true } } else { clientStatus.Phase = fluidapi.RuntimePhaseNotReady diff --git a/pkg/ddc/cache/engine/status_test.go b/pkg/ddc/cache/engine/status_test.go index c59fcef60b1..ca8fa3b830b 100644 --- a/pkg/ddc/cache/engine/status_test.go +++ b/pkg/ddc/cache/engine/status_test.go @@ -133,6 +133,109 @@ func TestCheckAndUpdateRuntimeStatusRequiresClientReady(t *testing.T) { } } +func TestCheckAndUpdateRuntimeStatusRequiresAllClientReplicasReady(t *testing.T) { + const ( + namespace = "default" + runtimeName = "curvine-demo" + masterName = "curvine-demo-master" + workerName = "curvine-demo-worker" + clientName = "curvine-demo-client" + ) + + runtime := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: runtimeName, + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)), + }, + } + + client := fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + runtime, + newStatefulSetComponent(masterName, namespace, 1, 1), + newStatefulSetComponent(workerName, namespace, 1, 1), + newDaemonSetComponent(clientName, namespace, 2, 1), + ) + + engine := &CacheEngine{ + Client: client, + Scheme: datav1alpha1.UnitTestScheme, + name: runtimeName, + namespace: namespace, + Log: fake.NullLogger(), + } + + value := &common.CacheRuntimeValue{ + Master: &common.CacheRuntimeComponentValue{ + Enabled: true, + Name: masterName, + Namespace: namespace, + WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "StatefulSet"}, + }, + Worker: &common.CacheRuntimeComponentValue{ + Enabled: true, + Name: workerName, + Namespace: namespace, + WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "StatefulSet"}, + }, + Client: &common.CacheRuntimeComponentValue{ + Enabled: true, + Name: clientName, + Namespace: namespace, + WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "DaemonSet"}, + }, + } + + ready, err := engine.CheckAndUpdateRuntimeStatus(value) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if ready { + t.Fatalf("expected runtime to stay not ready while client is only partially ready") + } + + updatedRuntime := &datav1alpha1.CacheRuntime{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: runtimeName, Namespace: namespace}, updatedRuntime); err != nil { + t.Fatalf("failed to get runtime after partial-ready status update: %v", err) + } + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhasePartialReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhasePartialReady, updatedRuntime.Status.Client.Phase) + } + if updatedRuntime.Status.SetupDuration != "" { + t.Fatalf("expected setup duration to stay empty before all client replicas are ready, got %q", updatedRuntime.Status.SetupDuration) + } + + clientDaemonSet := &appsv1.DaemonSet{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: clientName, Namespace: namespace}, clientDaemonSet); err != nil { + t.Fatalf("failed to get client daemonset: %v", err) + } + clientDaemonSet.Status.NumberReady = 2 + clientDaemonSet.Status.NumberAvailable = 2 + clientDaemonSet.Status.NumberUnavailable = 0 + if err := client.Status().Update(context.TODO(), clientDaemonSet); err != nil { + t.Fatalf("failed to update client daemonset status: %v", err) + } + + ready, err = engine.CheckAndUpdateRuntimeStatus(value) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if !ready { + t.Fatalf("expected runtime to become ready once all client replicas are ready") + } + + if err := client.Get(context.TODO(), types.NamespacedName{Name: runtimeName, Namespace: namespace}, updatedRuntime); err != nil { + t.Fatalf("failed to get runtime after full-ready status update: %v", err) + } + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseReady, updatedRuntime.Status.Client.Phase) + } + if updatedRuntime.Status.SetupDuration == "" { + t.Fatalf("expected setup duration to be recorded once all client replicas are ready") + } +} + func newStatefulSetComponent(name, namespace string, desiredReplicas, readyReplicas int32) *appsv1.StatefulSet { replicas := desiredReplicas return &appsv1.StatefulSet{ From 47a70761fcb28fee83aa6c28326b7de98078ac4e Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Fri, 17 Apr 2026 18:10:16 +0800 Subject: [PATCH 03/12] fix: simplify cache client readiness check Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/status.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index 2afa0a24081..32c0ffae325 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -79,16 +79,11 @@ func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRunti if err != nil { return false, err } - if clientStatus.DesiredReplicas == 0 { + if clientStatus.DesiredReplicas == 0 || clientStatus.ReadyReplicas >= clientStatus.DesiredReplicas { clientStatus.Phase = fluidapi.RuntimePhaseReady ready = true } else if clientStatus.ReadyReplicas > 0 { - if clientStatus.DesiredReplicas == clientStatus.ReadyReplicas { - clientStatus.Phase = fluidapi.RuntimePhaseReady - ready = true - } else if clientStatus.ReadyReplicas >= 1 { - clientStatus.Phase = fluidapi.RuntimePhasePartialReady - } + clientStatus.Phase = fluidapi.RuntimePhasePartialReady } else { clientStatus.Phase = fluidapi.RuntimePhaseNotReady } From f79aac215adf2f896045ccc236907fba3275a375 Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Sun, 19 Apr 2026 23:26:03 +0800 Subject: [PATCH 04/12] fix: restore cache runtime readiness semantics Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/status.go | 11 +- pkg/ddc/cache/engine/status_test.go | 353 ++++++++++++++++------------ 2 files changed, 211 insertions(+), 153 deletions(-) diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index 32c0ffae325..7d57f30ccab 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -92,9 +92,13 @@ func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRunti return ready, nil } func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValue) (bool, error) { - var masterReady, workerReady, clientReady, runtimeReady = true, true, true, false + runtimeReady := false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // Reset readiness on each retry to avoid stale state after conflicts. + masterReady, workerReady, clientReady := true, true, true + runtimeReady = false + runtime, err := e.getRuntime() if err != nil { return err @@ -122,9 +126,8 @@ func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValu } } - if masterReady && workerReady && clientReady { - runtimeReady = true - } else { + runtimeReady = masterReady && workerReady + if !runtimeReady { e.Log.Info(fmt.Sprintf("MasterReady: %v, workerReady: %v, clientReady: %v", masterReady, workerReady, clientReady)) } diff --git a/pkg/ddc/cache/engine/status_test.go b/pkg/ddc/cache/engine/status_test.go index ca8fa3b830b..16c42e56669 100644 --- a/pkg/ddc/cache/engine/status_test.go +++ b/pkg/ddc/cache/engine/status_test.go @@ -18,222 +18,243 @@ package engine import ( "context" + "errors" "testing" "time" appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) -func TestCheckAndUpdateRuntimeStatusRequiresClientReady(t *testing.T) { - const ( - namespace = "default" - runtimeName = "curvine-demo" - masterName = "curvine-demo-master" - workerName = "curvine-demo-worker" - clientName = "curvine-demo-client" - ) - - runtime := &datav1alpha1.CacheRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: runtimeName, - Namespace: namespace, - CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)), - }, - } +const ( + testStatusNamespace = "default" + testStatusRuntime = "curvine-demo" + testStatusMaster = "curvine-demo-master" + testStatusWorker = "curvine-demo-worker" + testStatusClient = "curvine-demo-client" + testCacheRuntimeGR = "cacheruntimes" + testCacheRuntimeGV = "data.fluid.io" + testStatusWorkloadAP = "apps/v1" +) - client := fake.NewFakeClientWithScheme( - datav1alpha1.UnitTestScheme, - runtime, - newStatefulSetComponent(masterName, namespace, 1, 1), - newStatefulSetComponent(workerName, namespace, 1, 1), - newDaemonSetComponent(clientName, namespace, 1, 0), +func TestCheckAndUpdateRuntimeStatusClientNotReadyDoesNotBlockRuntimeReady(t *testing.T) { + engine, client := newStatusTestEngineWithClient( + t, + fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + newDaemonSetComponent(testStatusClient, testStatusNamespace, 1, 0), + ), ) - engine := &CacheEngine{ - Client: client, - Scheme: datav1alpha1.UnitTestScheme, - name: runtimeName, - namespace: namespace, - Log: fake.NullLogger(), - } - - value := &common.CacheRuntimeValue{ - Master: &common.CacheRuntimeComponentValue{ - Enabled: true, - Name: masterName, - Namespace: namespace, - WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "StatefulSet"}, - }, - Worker: &common.CacheRuntimeComponentValue{ - Enabled: true, - Name: workerName, - Namespace: namespace, - WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "StatefulSet"}, - }, - Client: &common.CacheRuntimeComponentValue{ - Enabled: true, - Name: clientName, - Namespace: namespace, - WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "DaemonSet"}, - }, - } - - ready, err := engine.CheckAndUpdateRuntimeStatus(value) + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true)) if err != nil { t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) } - if ready { - t.Fatalf("expected runtime to stay not ready while client is not ready") + if !ready { + t.Fatalf("expected runtime to become ready once master and worker are ready") } - updatedRuntime := &datav1alpha1.CacheRuntime{} - if err := client.Get(context.TODO(), types.NamespacedName{Name: runtimeName, Namespace: namespace}, updatedRuntime); err != nil { - t.Fatalf("failed to get runtime after first status update: %v", err) - } + updatedRuntime := getUpdatedRuntime(t, client) if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseNotReady { t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Client.Phase) } - if updatedRuntime.Status.SetupDuration != "" { - t.Fatalf("expected setup duration to stay empty before runtime is ready, got %q", updatedRuntime.Status.SetupDuration) + if updatedRuntime.Status.SetupDuration == "" { + t.Fatalf("expected setup duration to be recorded once runtime is ready") + } +} + +func TestCheckAndUpdateRuntimeStatusClientPartialReadyDoesNotBlockRuntimeReady(t *testing.T) { + engine, client := newStatusTestEngineWithClient( + t, + fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + newDaemonSetComponent(testStatusClient, testStatusNamespace, 2, 1), + ), + ) + + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true)) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if !ready { + t.Fatalf("expected runtime to become ready once master and worker are ready") } - clientDaemonSet := &appsv1.DaemonSet{} - if err := client.Get(context.TODO(), types.NamespacedName{Name: clientName, Namespace: namespace}, clientDaemonSet); err != nil { - t.Fatalf("failed to get client daemonset: %v", err) + updatedRuntime := getUpdatedRuntime(t, client) + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhasePartialReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhasePartialReady, updatedRuntime.Status.Client.Phase) } - clientDaemonSet.Status.NumberReady = 1 - clientDaemonSet.Status.NumberAvailable = 1 - clientDaemonSet.Status.NumberUnavailable = 0 - if err := client.Status().Update(context.TODO(), clientDaemonSet); err != nil { - t.Fatalf("failed to update client daemonset status: %v", err) + if updatedRuntime.Status.SetupDuration == "" { + t.Fatalf("expected setup duration to be recorded once runtime is ready") } +} - ready, err = engine.CheckAndUpdateRuntimeStatus(value) +func TestCheckAndUpdateRuntimeStatusClientZeroDesiredReplicasReportsReady(t *testing.T) { + engine, client := newStatusTestEngineWithClient( + t, + fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + newDaemonSetComponent(testStatusClient, testStatusNamespace, 0, 0), + ), + ) + + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true)) if err != nil { t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) } if !ready { - t.Fatalf("expected runtime to become ready once client is ready") + t.Fatalf("expected runtime to stay ready when client desires zero replicas") } - if err := client.Get(context.TODO(), types.NamespacedName{Name: runtimeName, Namespace: namespace}, updatedRuntime); err != nil { - t.Fatalf("failed to get runtime after second status update: %v", err) - } + updatedRuntime := getUpdatedRuntime(t, client) if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseReady { t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseReady, updatedRuntime.Status.Client.Phase) } - if updatedRuntime.Status.SetupDuration == "" { - t.Fatalf("expected setup duration to be recorded once runtime is ready") + if updatedRuntime.Status.Client.DesiredReplicas != 0 { + t.Fatalf("expected desired replicas to stay 0, got %d", updatedRuntime.Status.Client.DesiredReplicas) } } -func TestCheckAndUpdateRuntimeStatusRequiresAllClientReplicasReady(t *testing.T) { - const ( - namespace = "default" - runtimeName = "curvine-demo" - masterName = "curvine-demo-master" - workerName = "curvine-demo-worker" - clientName = "curvine-demo-client" +func TestCheckAndUpdateRuntimeStatusClientFullyReadyReportsReady(t *testing.T) { + engine, client := newStatusTestEngineWithClient( + t, + fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + newDaemonSetComponent(testStatusClient, testStatusNamespace, 2, 2), + ), ) - runtime := &datav1alpha1.CacheRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: runtimeName, - Namespace: namespace, - CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)), - }, + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true)) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if !ready { + t.Fatalf("expected runtime to stay ready when client is fully ready") + } + + updatedRuntime := getUpdatedRuntime(t, client) + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseReady, updatedRuntime.Status.Client.Phase) } + if updatedRuntime.Status.Client.ReadyReplicas != updatedRuntime.Status.Client.DesiredReplicas { + t.Fatalf("expected ready replicas to match desired replicas, got %d/%d", updatedRuntime.Status.Client.ReadyReplicas, updatedRuntime.Status.Client.DesiredReplicas) + } +} - client := fake.NewFakeClientWithScheme( +func TestCheckAndUpdateRuntimeStatusRecomputesRuntimeReadyOnRetry(t *testing.T) { + baseClient := fake.NewFakeClientWithScheme( datav1alpha1.UnitTestScheme, - runtime, - newStatefulSetComponent(masterName, namespace, 1, 1), - newStatefulSetComponent(workerName, namespace, 1, 1), - newDaemonSetComponent(clientName, namespace, 2, 1), + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), ) - engine := &CacheEngine{ - Client: client, - Scheme: datav1alpha1.UnitTestScheme, - name: runtimeName, - namespace: namespace, - Log: fake.NullLogger(), - } + client := &conflictOnceClient{ + Client: baseClient, + statusWriter: &conflictOnceStatusWriter{ + StatusWriter: baseClient.Status(), + beforeConflict: func(ctx context.Context) error { + worker := &appsv1.StatefulSet{} + if err := baseClient.Get(ctx, types.NamespacedName{Name: testStatusWorker, Namespace: testStatusNamespace}, worker); err != nil { + return err + } - value := &common.CacheRuntimeValue{ - Master: &common.CacheRuntimeComponentValue{ - Enabled: true, - Name: masterName, - Namespace: namespace, - WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "StatefulSet"}, - }, - Worker: &common.CacheRuntimeComponentValue{ - Enabled: true, - Name: workerName, - Namespace: namespace, - WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "StatefulSet"}, - }, - Client: &common.CacheRuntimeComponentValue{ - Enabled: true, - Name: clientName, - Namespace: namespace, - WorkloadType: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "DaemonSet"}, + worker.Status.ReadyReplicas = 0 + worker.Status.AvailableReplicas = 0 + return baseClient.Status().Update(ctx, worker) + }, }, } - ready, err := engine.CheckAndUpdateRuntimeStatus(value) + engine, _ := newStatusTestEngineWithClient(t, client) + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(false)) if err != nil { t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) } if ready { - t.Fatalf("expected runtime to stay not ready while client is only partially ready") + t.Fatalf("expected runtime to be not ready after retry sees worker become not ready") } - updatedRuntime := &datav1alpha1.CacheRuntime{} - if err := client.Get(context.TODO(), types.NamespacedName{Name: runtimeName, Namespace: namespace}, updatedRuntime); err != nil { - t.Fatalf("failed to get runtime after partial-ready status update: %v", err) - } - if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhasePartialReady { - t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhasePartialReady, updatedRuntime.Status.Client.Phase) + updatedRuntime := getUpdatedRuntime(t, client) + if updatedRuntime.Status.Worker.Phase != datav1alpha1.RuntimePhaseNotReady { + t.Fatalf("expected worker phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Worker.Phase) } if updatedRuntime.Status.SetupDuration != "" { - t.Fatalf("expected setup duration to stay empty before all client replicas are ready, got %q", updatedRuntime.Status.SetupDuration) + t.Fatalf("expected setup duration to stay empty when final runtime status is not ready, got %q", updatedRuntime.Status.SetupDuration) } +} - clientDaemonSet := &appsv1.DaemonSet{} - if err := client.Get(context.TODO(), types.NamespacedName{Name: clientName, Namespace: namespace}, clientDaemonSet); err != nil { - t.Fatalf("failed to get client daemonset: %v", err) - } - clientDaemonSet.Status.NumberReady = 2 - clientDaemonSet.Status.NumberAvailable = 2 - clientDaemonSet.Status.NumberUnavailable = 0 - if err := client.Status().Update(context.TODO(), clientDaemonSet); err != nil { - t.Fatalf("failed to update client daemonset status: %v", err) - } +func newStatusTestEngineWithClient(t *testing.T, client ctrlclient.Client) (*CacheEngine, ctrlclient.Client) { + t.Helper() - ready, err = engine.CheckAndUpdateRuntimeStatus(value) - if err != nil { - t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) - } - if !ready { - t.Fatalf("expected runtime to become ready once all client replicas are ready") + return &CacheEngine{ + Client: client, + Scheme: datav1alpha1.UnitTestScheme, + name: testStatusRuntime, + namespace: testStatusNamespace, + Log: fake.NullLogger(), + }, client +} + +func newStatusTestRuntimeValue(enableClient bool) *common.CacheRuntimeValue { + value := &common.CacheRuntimeValue{ + Master: newStatusTestComponentValue(testStatusMaster, "StatefulSet"), + Worker: newStatusTestComponentValue(testStatusWorker, "StatefulSet"), + Client: newStatusTestComponentValue(testStatusClient, "DaemonSet"), } + value.Client.Enabled = enableClient + + return value +} - if err := client.Get(context.TODO(), types.NamespacedName{Name: runtimeName, Namespace: namespace}, updatedRuntime); err != nil { - t.Fatalf("failed to get runtime after full-ready status update: %v", err) +func newStatusTestComponentValue(name, kind string) *common.CacheRuntimeComponentValue { + return &common.CacheRuntimeComponentValue{ + Enabled: true, + Name: name, + Namespace: testStatusNamespace, + WorkloadType: metav1.TypeMeta{APIVersion: testStatusWorkloadAP, Kind: kind}, } - if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseReady { - t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseReady, updatedRuntime.Status.Client.Phase) +} + +func newStatusTestRuntime() *datav1alpha1.CacheRuntime { + return &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: testStatusRuntime, + Namespace: testStatusNamespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)), + }, } - if updatedRuntime.Status.SetupDuration == "" { - t.Fatalf("expected setup duration to be recorded once all client replicas are ready") +} + +func getUpdatedRuntime(t *testing.T, client ctrlclient.Client) *datav1alpha1.CacheRuntime { + t.Helper() + + updatedRuntime := &datav1alpha1.CacheRuntime{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: testStatusRuntime, Namespace: testStatusNamespace}, updatedRuntime); err != nil { + t.Fatalf("failed to get updated runtime: %v", err) } + + return updatedRuntime } func newStatefulSetComponent(name, namespace string, desiredReplicas, readyReplicas int32) *appsv1.StatefulSet { @@ -269,3 +290,37 @@ func newDaemonSetComponent(name, namespace string, desiredReplicas, readyReplica }, } } + +type conflictOnceClient struct { + ctrlclient.Client + statusWriter ctrlclient.StatusWriter +} + +func (c *conflictOnceClient) Status() ctrlclient.StatusWriter { + return c.statusWriter +} + +type conflictOnceStatusWriter struct { + ctrlclient.StatusWriter + beforeConflict func(ctx context.Context) error + conflicted bool +} + +func (w *conflictOnceStatusWriter) Update(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.SubResourceUpdateOption) error { + if !w.conflicted { + w.conflicted = true + if w.beforeConflict != nil { + if err := w.beforeConflict(ctx); err != nil { + return err + } + } + + return apierrors.NewConflict( + schema.GroupResource{Group: testCacheRuntimeGV, Resource: testCacheRuntimeGR}, + obj.GetName(), + errors.New("injected conflict"), + ) + } + + return w.StatusWriter.Update(ctx, obj, opts...) +} From c2488135a974b37b408a057e3cc2fce2ea18550f Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Sun, 19 Apr 2026 23:59:38 +0800 Subject: [PATCH 05/12] fix: treat zero desired cache client as not ready Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/status.go | 2 +- pkg/ddc/cache/engine/status_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index 7d57f30ccab..e619091d232 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -79,7 +79,7 @@ func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRunti if err != nil { return false, err } - if clientStatus.DesiredReplicas == 0 || clientStatus.ReadyReplicas >= clientStatus.DesiredReplicas { + if clientStatus.DesiredReplicas > 0 && clientStatus.ReadyReplicas >= clientStatus.DesiredReplicas { clientStatus.Phase = fluidapi.RuntimePhaseReady ready = true } else if clientStatus.ReadyReplicas > 0 { diff --git a/pkg/ddc/cache/engine/status_test.go b/pkg/ddc/cache/engine/status_test.go index 16c42e56669..f04febd6af7 100644 --- a/pkg/ddc/cache/engine/status_test.go +++ b/pkg/ddc/cache/engine/status_test.go @@ -103,7 +103,7 @@ func TestCheckAndUpdateRuntimeStatusClientPartialReadyDoesNotBlockRuntimeReady(t } } -func TestCheckAndUpdateRuntimeStatusClientZeroDesiredReplicasReportsReady(t *testing.T) { +func TestCheckAndUpdateRuntimeStatusClientZeroDesiredReplicasStayNotReady(t *testing.T) { engine, client := newStatusTestEngineWithClient( t, fake.NewFakeClientWithScheme( @@ -124,8 +124,8 @@ func TestCheckAndUpdateRuntimeStatusClientZeroDesiredReplicasReportsReady(t *tes } updatedRuntime := getUpdatedRuntime(t, client) - if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseReady { - t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseReady, updatedRuntime.Status.Client.Phase) + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseNotReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Client.Phase) } if updatedRuntime.Status.Client.DesiredReplicas != 0 { t.Fatalf("expected desired replicas to stay 0, got %d", updatedRuntime.Status.Client.DesiredReplicas) From d4d500497f327e4236b76e0356a4f80fbcb65f3a Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Sat, 25 Apr 2026 13:41:20 +0800 Subject: [PATCH 06/12] fix(cache): clarify client readiness logging Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/status.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index e619091d232..5762785f038 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -96,7 +96,7 @@ func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValu err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // Reset readiness on each retry to avoid stale state after conflicts. - masterReady, workerReady, clientReady := true, true, true + masterReady, workerReady, clientFullyReady := true, true, true runtimeReady = false runtime, err := e.getRuntime() @@ -120,7 +120,7 @@ func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValu } if value.Client.Enabled { - clientReady, err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status) + clientFullyReady, err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status) if err != nil { return err } @@ -128,7 +128,13 @@ func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValu runtimeReady = masterReady && workerReady if !runtimeReady { - e.Log.Info(fmt.Sprintf("MasterReady: %v, workerReady: %v, clientReady: %v", masterReady, workerReady, clientReady)) + e.Log.Info(fmt.Sprintf( + "MasterReady: %v, workerReady: %v, clientFullyReady: %v, clientPhase: %s", + masterReady, + workerReady, + clientFullyReady, + runtimeToUpdate.Status.Client.Phase, + )) } // Update the setup time From cef6b282f9063105eb0411f1ec82c6c20c85a638 Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Mon, 27 Apr 2026 20:46:47 +0800 Subject: [PATCH 07/12] chore: address cache client review feedback Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/status.go | 8 ++++---- pkg/ddc/cache/engine/status_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index 5762785f038..c1849fd01e7 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -72,7 +72,7 @@ func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRunti return ready, err } -func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (ready bool, err error) { +func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (fullyReady bool, err error) { manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) clientStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) @@ -81,7 +81,7 @@ func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRunti } if clientStatus.DesiredReplicas > 0 && clientStatus.ReadyReplicas >= clientStatus.DesiredReplicas { clientStatus.Phase = fluidapi.RuntimePhaseReady - ready = true + fullyReady = true } else if clientStatus.ReadyReplicas > 0 { clientStatus.Phase = fluidapi.RuntimePhasePartialReady } else { @@ -89,14 +89,14 @@ func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRunti } status.Client = clientStatus - return ready, nil + return fullyReady, nil } func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValue) (bool, error) { runtimeReady := false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // Reset readiness on each retry to avoid stale state after conflicts. - masterReady, workerReady, clientFullyReady := true, true, true + masterReady, workerReady, clientFullyReady := true, true, false runtimeReady = false runtime, err := e.getRuntime() diff --git a/pkg/ddc/cache/engine/status_test.go b/pkg/ddc/cache/engine/status_test.go index f04febd6af7..773a87fd5cb 100644 --- a/pkg/ddc/cache/engine/status_test.go +++ b/pkg/ddc/cache/engine/status_test.go @@ -103,7 +103,7 @@ func TestCheckAndUpdateRuntimeStatusClientPartialReadyDoesNotBlockRuntimeReady(t } } -func TestCheckAndUpdateRuntimeStatusClientZeroDesiredReplicasStayNotReady(t *testing.T) { +func TestCheckAndUpdateRuntimeStatusClientZeroDesiredReplicasReportsNotReady(t *testing.T) { engine, client := newStatusTestEngineWithClient( t, fake.NewFakeClientWithScheme( From ae40f0814c6772e1636d0920a5ee6b963d4d7146 Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Mon, 27 Apr 2026 21:41:49 +0800 Subject: [PATCH 08/12] test: wait for curvine client readiness in e2e Signed-off-by: CAICAIIs <3360776475@qq.com> --- test/gha-e2e/curvine/test.sh | 39 ++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/test/gha-e2e/curvine/test.sh b/test/gha-e2e/curvine/test.sh index 6ea23144083..682c3ef8ada 100644 --- a/test/gha-e2e/curvine/test.sh +++ b/test/gha-e2e/curvine/test.sh @@ -71,6 +71,39 @@ function wait_dataset_bound() { syslog "Found dataset $dataset_name status.phase==Bound" } +function wait_cache_client_ready() { + local deadline=180 # 3 minutes + local last_phase="" + local ready_replicas="" + local desired_replicas="" + local log_interval=0 + local log_times=0 + + while true; do + last_phase=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.phase}') + ready_replicas=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.readyReplicas}') + desired_replicas=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.desiredReplicas}') + + if [[ $log_interval -eq 3 ]]; then + log_times=$((log_times + 1)) + syslog "checking cacheruntime.status.client.phase==Ready (already $((log_times * log_interval * 5))s, last phase: ${last_phase:-}, readyReplicas: ${ready_replicas:-}, desiredReplicas: ${desired_replicas:-})" + if [[ $((log_times * log_interval * 5)) -ge $deadline ]]; then + panic "timeout waiting for cache client ready after ${deadline}s" + fi + log_interval=0 + fi + + if [[ "$last_phase" == "Ready" ]]; then + break + fi + + log_interval=$((log_interval + 1)) + sleep 5 + done + + syslog "Found cacheruntime $dataset_name status.client.phase==Ready" +} + function create_job() { local job_file=$1 local job_name=$2 @@ -141,13 +174,11 @@ function main() { setup create_dataset wait_dataset_bound - create_job test/gha-e2e/curvine/write_job.yaml $write_job_name wait_job_completed $write_job_name - - create_dataload + create_dataload wait_dataload_completed "curvine-dataload" - + wait_cache_client_ready create_job test/gha-e2e/curvine/read_job.yaml $read_job_name wait_job_completed $read_job_name From c99da76bb46ef2c15d97be67317bd12fb1413627 Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Mon, 27 Apr 2026 22:10:10 +0800 Subject: [PATCH 09/12] test: merge curvine e2e conflict resolution Signed-off-by: CAICAIIs <3360776475@qq.com> --- test/gha-e2e/curvine/test.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/test/gha-e2e/curvine/test.sh b/test/gha-e2e/curvine/test.sh index 682c3ef8ada..c56135b01d2 100644 --- a/test/gha-e2e/curvine/test.sh +++ b/test/gha-e2e/curvine/test.sh @@ -167,7 +167,6 @@ function wait_dataload_completed() { done syslog "Found succeeded dataload_name $dataload_name" } - function main() { syslog "[TESTCASE $testname STARTS AT $(date)]" trap dump_env_and_clean_up EXIT From 09732442ccb6070927a2e543589bddf59dd88a86 Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Mon, 27 Apr 2026 23:56:02 +0800 Subject: [PATCH 10/12] test: wait for curvine client pod readiness Signed-off-by: CAICAIIs <3360776475@qq.com> --- test/gha-e2e/curvine/test.sh | 43 ++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/test/gha-e2e/curvine/test.sh b/test/gha-e2e/curvine/test.sh index c56135b01d2..3e2e1bbb0d4 100644 --- a/test/gha-e2e/curvine/test.sh +++ b/test/gha-e2e/curvine/test.sh @@ -73,27 +73,36 @@ function wait_dataset_bound() { function wait_cache_client_ready() { local deadline=180 # 3 minutes + local client_component_name="${dataset_name}-client" + local client_selector="cacheruntime.data.fluid.io/component-name=${client_component_name}" local last_phase="" - local ready_replicas="" - local desired_replicas="" + local runtime_ready_replicas="" + local runtime_desired_replicas="" + local ds_ready_replicas="" + local ds_desired_replicas="" + local pod_states="" local log_interval=0 local log_times=0 while true; do last_phase=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.phase}') - ready_replicas=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.readyReplicas}') - desired_replicas=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.desiredReplicas}') + runtime_ready_replicas=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.readyReplicas}') + runtime_desired_replicas=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.desiredReplicas}') + ds_ready_replicas=$(kubectl get daemonset "$client_component_name" -ojsonpath='{@.status.numberReady}' 2>/dev/null) + ds_desired_replicas=$(kubectl get daemonset "$client_component_name" -ojsonpath='{@.status.desiredNumberScheduled}' 2>/dev/null) + pod_states=$(kubectl get pod -l "$client_selector" -ojsonpath='{range .items[*]}{.metadata.name}:{range .status.containerStatuses[*]}{.ready}{end}:{.status.phase}{" "}{end}' 2>/dev/null) if [[ $log_interval -eq 3 ]]; then log_times=$((log_times + 1)) - syslog "checking cacheruntime.status.client.phase==Ready (already $((log_times * log_interval * 5))s, last phase: ${last_phase:-}, readyReplicas: ${ready_replicas:-}, desiredReplicas: ${desired_replicas:-})" + syslog "checking cache client readiness (already $((log_times * log_interval * 5))s, runtime phase: ${last_phase:-}, runtime ready/desired: ${runtime_ready_replicas:-}/${runtime_desired_replicas:-}, ds ready/desired: ${ds_ready_replicas:-}/${ds_desired_replicas:-}, pods: ${pod_states:-})" if [[ $((log_times * log_interval * 5)) -ge $deadline ]]; then - panic "timeout waiting for cache client ready after ${deadline}s" + panic "timeout waiting for cache client pod ready after ${deadline}s" fi log_interval=0 fi - if [[ "$last_phase" == "Ready" ]]; then + if kubectl rollout status daemonset/"$client_component_name" --timeout=5s >/dev/null 2>&1 && \ + kubectl wait --for=condition=Ready --timeout=5s pod -l "$client_selector" >/dev/null 2>&1; then break fi @@ -101,7 +110,7 @@ function wait_cache_client_ready() { sleep 5 done - syslog "Found cacheruntime $dataset_name status.client.phase==Ready" + syslog "Found ready cache client pod for $dataset_name" } function create_job() { @@ -133,15 +142,15 @@ function wait_job_completed() { function dump_env_and_clean_up() { bash tools/diagnose-fluid-curvine.sh collect --name $dataset_name --namespace default --collect-path ./e2e-tmp/testcase-curvine.tgz syslog "Cleaning up resources for testcase $testname" - kubectl delete -f test/gha-e2e/curvine/read_job.yaml - kubectl delete -f test/gha-e2e/curvine/write_job.yaml - kubectl delete -f test/gha-e2e/curvine/dataload.yaml - kubectl delete -f test/gha-e2e/curvine/dataset.yaml - kubectl delete -f test/gha-e2e/curvine/cacheruntime.yaml - kubectl delete -f test/gha-e2e/curvine/cacheruntimeclass.yaml - kubectl delete -f test/gha-e2e/curvine/minio.yaml - kubectl delete -f test/gha-e2e/curvine/mount.yaml - kubectl delete -f test/gha-e2e/curvine/minio_create_bucket.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/read_job.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/write_job.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/dataload.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/dataset.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/cacheruntime.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/cacheruntimeclass.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/minio.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/mount.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/minio_create_bucket.yaml } function create_dataload() { From 2ece6ef8dae6b23b98cd23753441f57f259e675d Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Tue, 28 Apr 2026 00:33:14 +0800 Subject: [PATCH 11/12] test: fix curvine client selector Signed-off-by: CAICAIIs <3360776475@qq.com> --- test/gha-e2e/curvine/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/gha-e2e/curvine/test.sh b/test/gha-e2e/curvine/test.sh index 3e2e1bbb0d4..309ce7d366b 100644 --- a/test/gha-e2e/curvine/test.sh +++ b/test/gha-e2e/curvine/test.sh @@ -74,7 +74,7 @@ function wait_dataset_bound() { function wait_cache_client_ready() { local deadline=180 # 3 minutes local client_component_name="${dataset_name}-client" - local client_selector="cacheruntime.data.fluid.io/component-name=${client_component_name}" + local client_selector="cacheruntime.fluid.io/component-name=${client_component_name}" local last_phase="" local runtime_ready_replicas="" local runtime_desired_replicas="" From e986f63d04382462a5ec72ad5ff4f59c0bddd1f2 Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Tue, 28 Apr 2026 00:59:45 +0800 Subject: [PATCH 12/12] test: harden alluxio e2e wait Signed-off-by: CAICAIIs <3360776475@qq.com> --- test/gha-e2e/alluxio/test.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/gha-e2e/alluxio/test.sh b/test/gha-e2e/alluxio/test.sh index 5dfe3134770..da64c6b29bd 100644 --- a/test/gha-e2e/alluxio/test.sh +++ b/test/gha-e2e/alluxio/test.sh @@ -28,7 +28,7 @@ function create_dataset() { } function wait_dataset_bound() { - deadline=180 # 3 minutes + deadline=300 # 5 minutes last_state="" log_interval=0 log_times=0 @@ -37,6 +37,8 @@ function wait_dataset_bound() { if [[ $log_interval -eq 3 ]]; then log_times=$(expr $log_times + 1) syslog "checking dataset.status.phase==Bound (already $(expr $log_times \* $log_interval \* 5)s, last state: $last_state)" + runtime_pods=$(kubectl get pods --no-headers 2>/dev/null | awk -v prefix="${dataset_name}-" '$1 ~ "^" prefix {print $1 ":" $2 ":" $3}' | xargs) + syslog "runtime pods: ${runtime_pods:-}" if [[ "$(expr $log_times \* $log_interval \* 5)" -ge "$deadline" ]]; then panic "timeout for ${deadline}s!" fi @@ -78,7 +80,7 @@ function wait_job_completed() { function dump_env_and_clean_up() { bash tools/diagnose-fluid-alluxio.sh collect --name $dataset_name --namespace default --collect-path ./e2e-tmp/testcase-alluxio.tgz syslog "Cleaning up resources for testcase $testname" - kubectl delete -f test/gha-e2e/alluxio/ + kubectl delete --ignore-not-found -f test/gha-e2e/alluxio/ } function main() {