diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index d745a20cb01..c1849fd01e7 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -72,28 +72,33 @@ func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRunti return ready, err } -func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (err error) { +func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (fullyReady bool, err error) { manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) clientStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) if err != nil { - return err + return false, err } - if clientStatus.DesiredReplicas > 0 { - if clientStatus.DesiredReplicas == clientStatus.ReadyReplicas { - clientStatus.Phase = fluidapi.RuntimePhaseReady - } else if clientStatus.ReadyReplicas >= 1 { - clientStatus.Phase = fluidapi.RuntimePhasePartialReady - } + if clientStatus.DesiredReplicas > 0 && clientStatus.ReadyReplicas >= clientStatus.DesiredReplicas { + clientStatus.Phase = fluidapi.RuntimePhaseReady + fullyReady = true + } else if clientStatus.ReadyReplicas > 0 { + clientStatus.Phase = fluidapi.RuntimePhasePartialReady + } else { + clientStatus.Phase = fluidapi.RuntimePhaseNotReady } status.Client = clientStatus - return nil + return fullyReady, nil } func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValue) (bool, error) { - var masterReady, workerReady, runtimeReady = true, true, false + runtimeReady := false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // Reset readiness on each retry to avoid stale state after conflicts. + masterReady, workerReady, clientFullyReady := true, true, false + runtimeReady = false + runtime, err := e.getRuntime() if err != nil { return err @@ -115,16 +120,21 @@ func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValu } if value.Client.Enabled { - err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status) + clientFullyReady, err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status) if err != nil { return err } } - if masterReady && workerReady { - runtimeReady = true - } else { - e.Log.Info(fmt.Sprintf("MasterReady: %v, workerReady: %v", masterReady, workerReady)) + runtimeReady = masterReady && workerReady + if !runtimeReady { + e.Log.Info(fmt.Sprintf( + "MasterReady: %v, workerReady: %v, clientFullyReady: %v, clientPhase: %s", + masterReady, + workerReady, + clientFullyReady, + runtimeToUpdate.Status.Client.Phase, + )) } // Update the setup time diff --git a/pkg/ddc/cache/engine/status_test.go b/pkg/ddc/cache/engine/status_test.go new file mode 100644 index 00000000000..773a87fd5cb --- /dev/null +++ b/pkg/ddc/cache/engine/status_test.go @@ -0,0 +1,326 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package engine + +import ( + "context" + "errors" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +const ( + testStatusNamespace = "default" + testStatusRuntime = "curvine-demo" + testStatusMaster = "curvine-demo-master" + testStatusWorker = "curvine-demo-worker" + testStatusClient = "curvine-demo-client" + testCacheRuntimeGR = "cacheruntimes" + testCacheRuntimeGV = "data.fluid.io" + testStatusWorkloadAP = "apps/v1" +) + +func TestCheckAndUpdateRuntimeStatusClientNotReadyDoesNotBlockRuntimeReady(t *testing.T) { + engine, client := newStatusTestEngineWithClient( + t, + fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + newDaemonSetComponent(testStatusClient, testStatusNamespace, 1, 0), + ), + ) + + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true)) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if !ready { + t.Fatalf("expected runtime to become ready once master and worker are ready") + } + + updatedRuntime := getUpdatedRuntime(t, client) + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseNotReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Client.Phase) + } + if updatedRuntime.Status.SetupDuration == "" { + t.Fatalf("expected setup duration to be recorded once runtime is ready") + } +} + +func TestCheckAndUpdateRuntimeStatusClientPartialReadyDoesNotBlockRuntimeReady(t *testing.T) { + engine, client := newStatusTestEngineWithClient( + t, + fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + newDaemonSetComponent(testStatusClient, testStatusNamespace, 2, 1), + ), + ) + + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true)) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if !ready { + t.Fatalf("expected runtime to become ready once master and worker are ready") + } + + updatedRuntime := getUpdatedRuntime(t, client) + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhasePartialReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhasePartialReady, updatedRuntime.Status.Client.Phase) + } + if updatedRuntime.Status.SetupDuration == "" { + t.Fatalf("expected setup duration to be recorded once runtime is ready") + } +} + +func TestCheckAndUpdateRuntimeStatusClientZeroDesiredReplicasReportsNotReady(t *testing.T) { + engine, client := newStatusTestEngineWithClient( + t, + fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + newDaemonSetComponent(testStatusClient, testStatusNamespace, 0, 0), + ), + ) + + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true)) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if !ready { + t.Fatalf("expected runtime to stay ready when client desires zero replicas") + } + + updatedRuntime := getUpdatedRuntime(t, client) + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseNotReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Client.Phase) + } + if updatedRuntime.Status.Client.DesiredReplicas != 0 { + t.Fatalf("expected desired replicas to stay 0, got %d", updatedRuntime.Status.Client.DesiredReplicas) + } +} + +func TestCheckAndUpdateRuntimeStatusClientFullyReadyReportsReady(t *testing.T) { + engine, client := newStatusTestEngineWithClient( + t, + fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + newDaemonSetComponent(testStatusClient, testStatusNamespace, 2, 2), + ), + ) + + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true)) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if !ready { + t.Fatalf("expected runtime to stay ready when client is fully ready") + } + + updatedRuntime := getUpdatedRuntime(t, client) + if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseReady { + t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseReady, updatedRuntime.Status.Client.Phase) + } + if updatedRuntime.Status.Client.ReadyReplicas != updatedRuntime.Status.Client.DesiredReplicas { + t.Fatalf("expected ready replicas to match desired replicas, got %d/%d", updatedRuntime.Status.Client.ReadyReplicas, updatedRuntime.Status.Client.DesiredReplicas) + } +} + +func TestCheckAndUpdateRuntimeStatusRecomputesRuntimeReadyOnRetry(t *testing.T) { + baseClient := fake.NewFakeClientWithScheme( + datav1alpha1.UnitTestScheme, + newStatusTestRuntime(), + newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1), + newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1), + ) + + client := &conflictOnceClient{ + Client: baseClient, + statusWriter: &conflictOnceStatusWriter{ + StatusWriter: baseClient.Status(), + beforeConflict: func(ctx context.Context) error { + worker := &appsv1.StatefulSet{} + if err := baseClient.Get(ctx, types.NamespacedName{Name: testStatusWorker, Namespace: testStatusNamespace}, worker); err != nil { + return err + } + + worker.Status.ReadyReplicas = 0 + worker.Status.AvailableReplicas = 0 + return baseClient.Status().Update(ctx, worker) + }, + }, + } + + engine, _ := newStatusTestEngineWithClient(t, client) + ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(false)) + if err != nil { + t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err) + } + if ready { + t.Fatalf("expected runtime to be not ready after retry sees worker become not ready") + } + + updatedRuntime := getUpdatedRuntime(t, client) + if updatedRuntime.Status.Worker.Phase != datav1alpha1.RuntimePhaseNotReady { + t.Fatalf("expected worker phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Worker.Phase) + } + if updatedRuntime.Status.SetupDuration != "" { + t.Fatalf("expected setup duration to stay empty when final runtime status is not ready, got %q", updatedRuntime.Status.SetupDuration) + } +} + +func newStatusTestEngineWithClient(t *testing.T, client ctrlclient.Client) (*CacheEngine, ctrlclient.Client) { + t.Helper() + + return &CacheEngine{ + Client: client, + Scheme: datav1alpha1.UnitTestScheme, + name: testStatusRuntime, + namespace: testStatusNamespace, + Log: fake.NullLogger(), + }, client +} + +func newStatusTestRuntimeValue(enableClient bool) *common.CacheRuntimeValue { + value := &common.CacheRuntimeValue{ + Master: newStatusTestComponentValue(testStatusMaster, "StatefulSet"), + Worker: newStatusTestComponentValue(testStatusWorker, "StatefulSet"), + Client: newStatusTestComponentValue(testStatusClient, "DaemonSet"), + } + value.Client.Enabled = enableClient + + return value +} + +func newStatusTestComponentValue(name, kind string) *common.CacheRuntimeComponentValue { + return &common.CacheRuntimeComponentValue{ + Enabled: true, + Name: name, + Namespace: testStatusNamespace, + WorkloadType: metav1.TypeMeta{APIVersion: testStatusWorkloadAP, Kind: kind}, + } +} + +func newStatusTestRuntime() *datav1alpha1.CacheRuntime { + return &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: testStatusRuntime, + Namespace: testStatusNamespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)), + }, + } +} + +func getUpdatedRuntime(t *testing.T, client ctrlclient.Client) *datav1alpha1.CacheRuntime { + t.Helper() + + updatedRuntime := &datav1alpha1.CacheRuntime{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: testStatusRuntime, Namespace: testStatusNamespace}, updatedRuntime); err != nil { + t.Fatalf("failed to get updated runtime: %v", err) + } + + return updatedRuntime +} + +func newStatefulSetComponent(name, namespace string, desiredReplicas, readyReplicas int32) *appsv1.StatefulSet { + replicas := desiredReplicas + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + CurrentReplicas: desiredReplicas, + AvailableReplicas: readyReplicas, + ReadyReplicas: readyReplicas, + }, + } +} + +func newDaemonSetComponent(name, namespace string, desiredReplicas, readyReplicas int32) *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Status: appsv1.DaemonSetStatus{ + CurrentNumberScheduled: desiredReplicas, + DesiredNumberScheduled: desiredReplicas, + NumberAvailable: readyReplicas, + NumberReady: readyReplicas, + NumberUnavailable: desiredReplicas - readyReplicas, + }, + } +} + +type conflictOnceClient struct { + ctrlclient.Client + statusWriter ctrlclient.StatusWriter +} + +func (c *conflictOnceClient) Status() ctrlclient.StatusWriter { + return c.statusWriter +} + +type conflictOnceStatusWriter struct { + ctrlclient.StatusWriter + beforeConflict func(ctx context.Context) error + conflicted bool +} + +func (w *conflictOnceStatusWriter) Update(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.SubResourceUpdateOption) error { + if !w.conflicted { + w.conflicted = true + if w.beforeConflict != nil { + if err := w.beforeConflict(ctx); err != nil { + return err + } + } + + return apierrors.NewConflict( + schema.GroupResource{Group: testCacheRuntimeGV, Resource: testCacheRuntimeGR}, + obj.GetName(), + errors.New("injected conflict"), + ) + } + + return w.StatusWriter.Update(ctx, obj, opts...) +} diff --git a/test/gha-e2e/alluxio/test.sh b/test/gha-e2e/alluxio/test.sh index 5dfe3134770..da64c6b29bd 100644 --- a/test/gha-e2e/alluxio/test.sh +++ b/test/gha-e2e/alluxio/test.sh @@ -28,7 +28,7 @@ function create_dataset() { } function wait_dataset_bound() { - deadline=180 # 3 minutes + deadline=300 # 5 minutes last_state="" log_interval=0 log_times=0 @@ -37,6 +37,8 @@ function wait_dataset_bound() { if [[ $log_interval -eq 3 ]]; then log_times=$(expr $log_times + 1) syslog "checking dataset.status.phase==Bound (already $(expr $log_times \* $log_interval \* 5)s, last state: $last_state)" + runtime_pods=$(kubectl get pods --no-headers 2>/dev/null | awk -v prefix="${dataset_name}-" '$1 ~ "^" prefix {print $1 ":" $2 ":" $3}' | xargs) + syslog "runtime pods: ${runtime_pods:-}" if [[ "$(expr $log_times \* $log_interval \* 5)" -ge "$deadline" ]]; then panic "timeout for ${deadline}s!" fi @@ -78,7 +80,7 @@ function wait_job_completed() { function dump_env_and_clean_up() { bash tools/diagnose-fluid-alluxio.sh collect --name $dataset_name --namespace default --collect-path ./e2e-tmp/testcase-alluxio.tgz syslog "Cleaning up resources for testcase $testname" - kubectl delete -f test/gha-e2e/alluxio/ + kubectl delete --ignore-not-found -f test/gha-e2e/alluxio/ } function main() { diff --git a/test/gha-e2e/curvine/test.sh b/test/gha-e2e/curvine/test.sh index 6ea23144083..309ce7d366b 100644 --- a/test/gha-e2e/curvine/test.sh +++ b/test/gha-e2e/curvine/test.sh @@ -71,6 +71,48 @@ function wait_dataset_bound() { syslog "Found dataset $dataset_name status.phase==Bound" } +function wait_cache_client_ready() { + local deadline=180 # 3 minutes + local client_component_name="${dataset_name}-client" + local client_selector="cacheruntime.fluid.io/component-name=${client_component_name}" + local last_phase="" + local runtime_ready_replicas="" + local runtime_desired_replicas="" + local ds_ready_replicas="" + local ds_desired_replicas="" + local pod_states="" + local log_interval=0 + local log_times=0 + + while true; do + last_phase=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.phase}') + runtime_ready_replicas=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.readyReplicas}') + runtime_desired_replicas=$(kubectl get cacheruntime "$dataset_name" -ojsonpath='{@.status.client.desiredReplicas}') + ds_ready_replicas=$(kubectl get daemonset "$client_component_name" -ojsonpath='{@.status.numberReady}' 2>/dev/null) + ds_desired_replicas=$(kubectl get daemonset "$client_component_name" -ojsonpath='{@.status.desiredNumberScheduled}' 2>/dev/null) + pod_states=$(kubectl get pod -l "$client_selector" -ojsonpath='{range .items[*]}{.metadata.name}:{range .status.containerStatuses[*]}{.ready}{end}:{.status.phase}{" "}{end}' 2>/dev/null) + + if [[ $log_interval -eq 3 ]]; then + log_times=$((log_times + 1)) + syslog "checking cache client readiness (already $((log_times * log_interval * 5))s, runtime phase: ${last_phase:-}, runtime ready/desired: ${runtime_ready_replicas:-}/${runtime_desired_replicas:-}, ds ready/desired: ${ds_ready_replicas:-}/${ds_desired_replicas:-}, pods: ${pod_states:-})" + if [[ $((log_times * log_interval * 5)) -ge $deadline ]]; then + panic "timeout waiting for cache client pod ready after ${deadline}s" + fi + log_interval=0 + fi + + if kubectl rollout status daemonset/"$client_component_name" --timeout=5s >/dev/null 2>&1 && \ + kubectl wait --for=condition=Ready --timeout=5s pod -l "$client_selector" >/dev/null 2>&1; then + break + fi + + log_interval=$((log_interval + 1)) + sleep 5 + done + + syslog "Found ready cache client pod for $dataset_name" +} + function create_job() { local job_file=$1 local job_name=$2 @@ -100,15 +142,15 @@ function wait_job_completed() { function dump_env_and_clean_up() { bash tools/diagnose-fluid-curvine.sh collect --name $dataset_name --namespace default --collect-path ./e2e-tmp/testcase-curvine.tgz syslog "Cleaning up resources for testcase $testname" - kubectl delete -f test/gha-e2e/curvine/read_job.yaml - kubectl delete -f test/gha-e2e/curvine/write_job.yaml - kubectl delete -f test/gha-e2e/curvine/dataload.yaml - kubectl delete -f test/gha-e2e/curvine/dataset.yaml - kubectl delete -f test/gha-e2e/curvine/cacheruntime.yaml - kubectl delete -f test/gha-e2e/curvine/cacheruntimeclass.yaml - kubectl delete -f test/gha-e2e/curvine/minio.yaml - kubectl delete -f test/gha-e2e/curvine/mount.yaml - kubectl delete -f test/gha-e2e/curvine/minio_create_bucket.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/read_job.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/write_job.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/dataload.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/dataset.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/cacheruntime.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/cacheruntimeclass.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/minio.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/mount.yaml + kubectl delete --ignore-not-found -f test/gha-e2e/curvine/minio_create_bucket.yaml } function create_dataload() { @@ -134,20 +176,17 @@ function wait_dataload_completed() { done syslog "Found succeeded dataload_name $dataload_name" } - function main() { syslog "[TESTCASE $testname STARTS AT $(date)]" trap dump_env_and_clean_up EXIT setup create_dataset wait_dataset_bound - create_job test/gha-e2e/curvine/write_job.yaml $write_job_name wait_job_completed $write_job_name - - create_dataload + create_dataload wait_dataload_completed "curvine-dataload" - + wait_cache_client_ready create_job test/gha-e2e/curvine/read_job.yaml $read_job_name wait_job_completed $read_job_name