From 1e1beeab66ae39d5addb7a1d6b8c28ad350c8e9c Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Fri, 24 Apr 2026 12:30:58 +0800 Subject: [PATCH 1/2] fix: propagate context through configmap follow-up Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/cm.go | 20 ++- pkg/ddc/cache/engine/cm_test.go | 215 +++++++++++++++++++++++ pkg/ddc/cache/engine/setup.go | 2 +- pkg/ddc/cache/engine/sync.go | 11 +- pkg/ddc/thin/referencedataset/cm.go | 18 +- pkg/ddc/thin/referencedataset/cm_test.go | 127 +++++++++++-- pkg/ddc/thin/referencedataset/engine.go | 4 +- pkg/utils/kubeclient/configmap.go | 6 +- pkg/utils/kubeclient/configmap_test.go | 13 ++ 9 files changed, 377 insertions(+), 39 deletions(-) create mode 100644 pkg/ddc/cache/engine/cm_test.go diff --git a/pkg/ddc/cache/engine/cm.go b/pkg/ddc/cache/engine/cm.go index 980996defb7..5253c64f4b6 100644 --- a/pkg/ddc/cache/engine/cm.go +++ b/pkg/ddc/cache/engine/cm.go @@ -17,7 +17,9 @@ package engine import ( + "context" "encoding/json" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" @@ -27,7 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func (e *CacheEngine) createRuntimeConfigMaps(runtimeClass *datav1alpha1.CacheRuntimeClass) error { +func (e *CacheEngine) createRuntimeConfigMaps(ctx context.Context, runtimeClass *datav1alpha1.CacheRuntimeClass) error { runtime, err := e.getRuntime() if err != nil { return err @@ -46,29 +48,29 @@ func (e *CacheEngine) createRuntimeConfigMaps(runtimeClass *datav1alpha1.CacheRu } // create the config map defined in CacheRuntimeClass ExtraResources, runtimeClass PodTemplate can use these. - err = e.createConfigMapInRuntimeClass(&runtimeClass.ExtraResources, owner) + err = e.createConfigMapInRuntimeClass(ctx, &runtimeClass.ExtraResources, owner) if err != nil { return err } // create the config map generated by fluid, automatically mounted in component pods. - err = e.createRuntimeValueConfigMap(runtime, owner) + err = e.createRuntimeValueConfigMap(ctx, runtime, owner) return err } // create if not exists -func (e *CacheEngine) createConfigMapInRuntimeClass(extraResources *datav1alpha1.RuntimeExtraResources, owner []metav1.OwnerReference) error { +func (e *CacheEngine) createConfigMapInRuntimeClass(ctx context.Context, extraResources *datav1alpha1.RuntimeExtraResources, owner []metav1.OwnerReference) error { if extraResources.ConfigMaps == nil { return nil } for _, configMap := range extraResources.ConfigMaps { - cm, err := kubeclient.GetConfigmapByName(e.Client, configMap.Name, e.namespace) + cm, err := kubeclient.GetConfigmapByNameWithContext(ctx, e.Client, configMap.Name, e.namespace) if err != nil { return err } if cm == nil { - err = kubeclient.CreateConfigMapWithOwner(e.Client, configMap.Name, e.namespace, configMap.Data, owner) + err = kubeclient.CreateConfigMapWithOwnerWithContext(ctx, e.Client, configMap.Name, e.namespace, configMap.Data, owner) if err != nil { return err } @@ -79,8 +81,8 @@ func (e *CacheEngine) createConfigMapInRuntimeClass(extraResources *datav1alpha1 } // create if not exists -func (e *CacheEngine) createRuntimeValueConfigMap(runtime *datav1alpha1.CacheRuntime, owner []metav1.OwnerReference) error { - configMap, err := kubeclient.GetConfigmapByName(e.Client, e.getRuntimeConfigConfigMapName(), e.namespace) +func (e *CacheEngine) createRuntimeValueConfigMap(ctx context.Context, runtime *datav1alpha1.CacheRuntime, owner []metav1.OwnerReference) error { + configMap, err := kubeclient.GetConfigmapByNameWithContext(ctx, e.Client, e.getRuntimeConfigConfigMapName(), e.namespace) if err != nil { return err } @@ -93,7 +95,7 @@ func (e *CacheEngine) createRuntimeValueConfigMap(runtime *datav1alpha1.CacheRun return errors.Wrap(err, "failed to generate runtime config") } - return kubeclient.CreateConfigMapWithOwner(e.Client, e.getRuntimeConfigConfigMapName(), e.namespace, data, owner) + return kubeclient.CreateConfigMapWithOwnerWithContext(ctx, e.Client, e.getRuntimeConfigConfigMapName(), e.namespace, data, owner) } // generateRuntimeConfigData generate the data in the config map for runtime config diff --git a/pkg/ddc/cache/engine/cm_test.go b/pkg/ddc/cache/engine/cm_test.go new file mode 100644 index 00000000000..23ea7586be0 --- /dev/null +++ b/pkg/ddc/cache/engine/cm_test.go @@ -0,0 +1,215 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package engine + +import ( + "context" + "testing" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type contextAwareCacheClient struct { + client.Client + updateCount int +} + +func (c *contextAwareCacheClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if err := ctx.Err(); err != nil { + return err + } + return c.Client.Get(ctx, key, obj, opts...) +} + +func (c *contextAwareCacheClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + if err := ctx.Err(); err != nil { + return err + } + return c.Client.Create(ctx, obj, opts...) +} + +func (c *contextAwareCacheClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if err := ctx.Err(); err != nil { + return err + } + c.updateCount++ + return c.Client.Update(ctx, obj, opts...) +} + +func TestCreateConfigMapInRuntimeClassWithCanceledContext(t *testing.T) { + scheme := newCacheEngineTestScheme(t) + baseClient := fake.NewFakeClientWithScheme(scheme) + testClient := &contextAwareCacheClient{Client: baseClient} + engine := &CacheEngine{Client: testClient, name: "demo", namespace: "default"} + resources := &datav1alpha1.RuntimeExtraResources{ + ConfigMaps: []datav1alpha1.ConfigMapRuntimeExtraResource{{Name: "extra", Data: map[string]string{"key": "value"}}}, + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + if err := engine.createConfigMapInRuntimeClass(ctx, resources, nil); err != context.Canceled { + t.Fatalf("expected context.Canceled, got %v", err) + } +} + +func TestCreateConfigMapInRuntimeClassCreatesMissingConfigMap(t *testing.T) { + scheme := newCacheEngineTestScheme(t) + baseClient := fake.NewFakeClientWithScheme(scheme) + engine := &CacheEngine{Client: baseClient, name: "demo", namespace: "default"} + resources := &datav1alpha1.RuntimeExtraResources{ + ConfigMaps: []datav1alpha1.ConfigMapRuntimeExtraResource{{Name: "extra", Data: map[string]string{"key": "value"}}}, + } + + if err := engine.createConfigMapInRuntimeClass(context.Background(), resources, nil); err != nil { + t.Fatalf("expected no error, got %v", err) + } + + created := &corev1.ConfigMap{} + if err := baseClient.Get(context.Background(), types.NamespacedName{Name: "extra", Namespace: "default"}, created); err != nil { + t.Fatalf("expected configmap to be created, got %v", err) + } + if got := created.Data["key"]; got != "value" { + t.Fatalf("expected copied data value, got %q", got) + } +} + +func TestCreateRuntimeValueConfigMapCreatesMissingConfigMap(t *testing.T) { + scheme := newCacheEngineTestScheme(t) + runtimeObj := newCacheRuntimeForConfigMapTest() + runtimeClass := newCacheRuntimeClassForConfigMapTest() + dataset := newDatasetForConfigMapTest() + baseClient := fake.NewFakeClientWithScheme(scheme, runtimeObj, runtimeClass, dataset) + engine := &CacheEngine{Client: baseClient, name: "demo", namespace: "default"} + + if err := engine.createRuntimeValueConfigMap(context.Background(), runtimeObj, nil); err != nil { + t.Fatalf("expected no error, got %v", err) + } + + created := &corev1.ConfigMap{} + if err := baseClient.Get(context.Background(), types.NamespacedName{Name: engine.getRuntimeConfigConfigMapName(), Namespace: "default"}, created); err != nil { + t.Fatalf("expected runtime value configmap to be created, got %v", err) + } + if _, ok := created.Data[engine.getRuntimeConfigFileName()]; !ok { + t.Fatalf("expected runtime value config data key %q", engine.getRuntimeConfigFileName()) + } +} + +func TestSyncRuntimeValueConfigMapWithCanceledContext(t *testing.T) { + scheme := newCacheEngineTestScheme(t) + runtimeObj := newCacheRuntimeForConfigMapTest() + runtimeClass := newCacheRuntimeClassForConfigMapTest() + dataset := newDatasetForConfigMapTest() + testClient := &contextAwareCacheClient{Client: fake.NewFakeClientWithScheme(scheme, runtimeObj, runtimeClass, dataset)} + engine := &CacheEngine{Client: testClient, name: "demo", namespace: "default"} + canceledCtx, cancel := context.WithCancel(context.Background()) + cancel() + ctx := cruntime.ReconcileRequestContext{Context: canceledCtx} + + if err := engine.syncRuntimeValueConfigMap(ctx, runtimeObj); err != context.Canceled { + t.Fatalf("expected context.Canceled, got %v", err) + } +} + +func TestSyncRuntimeValueConfigMapSkipsUnchangedData(t *testing.T) { + scheme := newCacheEngineTestScheme(t) + runtimeObj := newCacheRuntimeForConfigMapTest() + runtimeClass := newCacheRuntimeClassForConfigMapTest() + dataset := newDatasetForConfigMapTest() + + baseEngine := &CacheEngine{Client: fake.NewFakeClientWithScheme(scheme, runtimeObj, runtimeClass, dataset), name: "demo", namespace: "default"} + data, err := baseEngine.generateRuntimeConfigData(runtimeObj) + if err != nil { + t.Fatalf("failed to generate runtime config data: %v", err) + } + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: baseEngine.getRuntimeConfigConfigMapName(), Namespace: "default"}, + Data: data, + } + + testClient := &contextAwareCacheClient{Client: fake.NewFakeClientWithScheme(scheme, runtimeObj, runtimeClass, dataset, configMap)} + engine := &CacheEngine{Client: testClient, name: "demo", namespace: "default"} + ctx := cruntime.ReconcileRequestContext{Context: context.Background()} + + if err := engine.syncRuntimeValueConfigMap(ctx, runtimeObj); err != nil { + t.Fatalf("expected no error, got %v", err) + } + if testClient.updateCount != 0 { + t.Fatalf("expected unchanged data to skip update, got %d updates", testClient.updateCount) + } +} + +func newCacheEngineTestScheme(t *testing.T) *runtime.Scheme { + t.Helper() + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + if err := datav1alpha1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + return scheme +} + +func newCacheRuntimeForConfigMapTest() *datav1alpha1.CacheRuntime { + return &datav1alpha1.CacheRuntime{ + TypeMeta: metav1.TypeMeta{APIVersion: "data.fluid.io/v1alpha1", Kind: datav1alpha1.CacheRuntimeKind}, + ObjectMeta: metav1.ObjectMeta{ + Name: "demo", + Namespace: "default", + UID: types.UID("demo-uid"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-class", + Master: datav1alpha1.CacheRuntimeMasterSpec{RuntimeComponentCommonSpec: datav1alpha1.RuntimeComponentCommonSpec{Disabled: true}}, + Worker: datav1alpha1.CacheRuntimeWorkerSpec{RuntimeComponentCommonSpec: datav1alpha1.RuntimeComponentCommonSpec{Disabled: true}}, + Client: datav1alpha1.CacheRuntimeClientSpec{RuntimeComponentCommonSpec: datav1alpha1.RuntimeComponentCommonSpec{Disabled: true}}, + }, + } +} + +func newCacheRuntimeClassForConfigMapTest() *datav1alpha1.CacheRuntimeClass { + return &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{Name: "test-class"}, + } +} + +func newDatasetForConfigMapTest() *datav1alpha1.Dataset { + return &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "demo", Namespace: "default"}, + Spec: datav1alpha1.DatasetSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadOnlyMany}, + Mounts: []datav1alpha1.Mount{ + { + Name: "hbase", + MountPoint: "local:///data", + Path: "/data", + ReadOnly: true, + Shared: true, + }, + }, + }, + Status: datav1alpha1.DatasetStatus{Runtimes: []datav1alpha1.Runtime{{Name: "demo", Type: common.CacheRuntime}}}, + } +} diff --git a/pkg/ddc/cache/engine/setup.go b/pkg/ddc/cache/engine/setup.go index eb45ee32420..ad810598003 100644 --- a/pkg/ddc/cache/engine/setup.go +++ b/pkg/ddc/cache/engine/setup.go @@ -45,7 +45,7 @@ func (e *CacheEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, e } // create runtime value configmap for runtime mount - err = e.createRuntimeConfigMaps(runtimeClass) + err = e.createRuntimeConfigMaps(ctx, runtimeClass) if err != nil { return false, err } diff --git a/pkg/ddc/cache/engine/sync.go b/pkg/ddc/cache/engine/sync.go index dd2d16ca2e6..787cbf77065 100644 --- a/pkg/ddc/cache/engine/sync.go +++ b/pkg/ddc/cache/engine/sync.go @@ -17,7 +17,6 @@ package engine import ( - "context" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" @@ -34,7 +33,7 @@ func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { return err } - err = e.syncRuntimeValueConfigMap(runtime) + err = e.syncRuntimeValueConfigMap(ctx, runtime) if err != nil { return err } @@ -54,8 +53,8 @@ func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { return nil } -func (e *CacheEngine) syncRuntimeValueConfigMap(runtime *datav1alpha1.CacheRuntime) error { - configMap, err := kubeclient.GetConfigmapByName(e.Client, e.getRuntimeConfigConfigMapName(), e.namespace) +func (e *CacheEngine) syncRuntimeValueConfigMap(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.CacheRuntime) error { + configMap, err := kubeclient.GetConfigmapByNameWithContext(ctx, e.Client, e.getRuntimeConfigConfigMapName(), e.namespace) if err != nil { return err } @@ -77,13 +76,13 @@ func (e *CacheEngine) syncRuntimeValueConfigMap(runtime *datav1alpha1.CacheRunti } if configMap == nil { - return kubeclient.CreateConfigMapWithOwner(e.Client, e.getRuntimeConfigConfigMapName(), e.namespace, data, owner) + return kubeclient.CreateConfigMapWithOwnerWithContext(ctx, e.Client, e.getRuntimeConfigConfigMapName(), e.namespace, data, owner) } configMapToUpdate := configMap.DeepCopy() configMapToUpdate.Data = data if !reflect.DeepEqual(configMapToUpdate, configMap) { - err = e.Client.Update(context.TODO(), configMapToUpdate) + err = kubeclient.UpdateConfigMapWithContext(ctx, e.Client, configMapToUpdate) if err != nil { return err } diff --git a/pkg/ddc/thin/referencedataset/cm.go b/pkg/ddc/thin/referencedataset/cm.go index 8f3a03de4f9..027b1dd314f 100644 --- a/pkg/ddc/thin/referencedataset/cm.go +++ b/pkg/ddc/thin/referencedataset/cm.go @@ -32,7 +32,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" ) -func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, physicalRuntimeInfo base.RuntimeInfoInterface) error { +func copyFuseDaemonSetForRefDataset(ctx context.Context, client client.Client, refDataset *datav1alpha1.Dataset, physicalRuntimeInfo base.RuntimeInfoInterface) error { var fuseName string switch physicalRuntimeInfo.GetRuntimeType() { case common.JindoRuntime: @@ -40,7 +40,7 @@ func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alph default: fuseName = physicalRuntimeInfo.GetName() + "-fuse" } - ds, err := kubeclient.GetDaemonset(client, fuseName, physicalRuntimeInfo.GetNamespace()) + ds, err := kubeclient.GetDaemonsetWithContext(ctx, client, fuseName, physicalRuntimeInfo.GetNamespace()) if err != nil { return err } @@ -70,7 +70,7 @@ func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alph } dsToCreate.Spec.Template.Spec.NodeSelector["fluid.io/fuse-balloon"] = "true" - err = client.Create(context.TODO(), dsToCreate) + err = client.Create(ctx, dsToCreate) if utils.IgnoreAlreadyExists(err) != nil { return err } @@ -78,7 +78,7 @@ func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alph return nil } -func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, physicalRuntimeInfo base.RuntimeInfoInterface) error { +func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(ctx context.Context, client client.Client, refDataset *datav1alpha1.Dataset, physicalRuntimeInfo base.RuntimeInfoInterface) error { physicalRuntimeType := physicalRuntimeInfo.GetRuntimeType() physicalRuntimeName := physicalRuntimeInfo.GetName() physicalRuntimeNamespace := physicalRuntimeInfo.GetNamespace() @@ -104,34 +104,34 @@ func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(client client.Clie // but duplicated name error can occurs if the dst namespace has same named runtime. case common.AlluxioRuntime: configMapName := physicalRuntimeName + "-config" - err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace}, + err := kubeclient.CopyConfigMapWithContext(ctx, client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace}, types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference) if err != nil { return err } case common.JuiceFSRuntime: fuseScriptConfigMapName := physicalRuntimeName + "-fuse-script" - err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: physicalRuntimeNamespace}, + err := kubeclient.CopyConfigMapWithContext(ctx, client, types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: physicalRuntimeNamespace}, types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: refNameSpace}, ownerReference) if err != nil { return err } case common.GooseFSRuntime: configMapName := physicalRuntimeName + "-config" - err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace}, + err := kubeclient.CopyConfigMapWithContext(ctx, client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace}, types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference) if err != nil { return err } case common.JindoRuntime: clientConfigMapName := physicalRuntimeName + "-jindofs-client-config" - err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: clientConfigMapName, Namespace: physicalRuntimeNamespace}, + err := kubeclient.CopyConfigMapWithContext(ctx, client, types.NamespacedName{Name: clientConfigMapName, Namespace: physicalRuntimeNamespace}, types.NamespacedName{Name: clientConfigMapName, Namespace: refNameSpace}, ownerReference) if err != nil { return err } configMapName := physicalRuntimeName + "-jindofs-config" - err = kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace}, + err = kubeclient.CopyConfigMapWithContext(ctx, client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace}, types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference) if err != nil { return err diff --git a/pkg/ddc/thin/referencedataset/cm_test.go b/pkg/ddc/thin/referencedataset/cm_test.go index b350d1460a6..a3ca8137b76 100644 --- a/pkg/ddc/thin/referencedataset/cm_test.go +++ b/pkg/ddc/thin/referencedataset/cm_test.go @@ -33,6 +33,24 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +type contextAwareClient struct { + client.Client +} + +func (c contextAwareClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if err := ctx.Err(); err != nil { + return err + } + return c.Client.Get(ctx, key, obj, opts...) +} + +func (c contextAwareClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + if err := ctx.Err(); err != nil { + return err + } + return c.Client.Create(ctx, obj, opts...) +} + var _ = Describe("ConfigMap Operations", func() { Describe("copyFuseDaemonSetForRefDataset", func() { var ( @@ -89,7 +107,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("alluxio", "source-ns", common.AlluxioRuntime) Expect(err).NotTo(HaveOccurred()) - err = copyFuseDaemonSetForRefDataset(fakeClient, refDataset, runtimeInfo) + err = copyFuseDaemonSetForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) var dsList appsv1.DaemonSetList @@ -143,7 +161,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("jindo", "source-ns", common.JindoRuntime) Expect(err).NotTo(HaveOccurred()) - err = copyFuseDaemonSetForRefDataset(fakeClient, refDataset, runtimeInfo) + err = copyFuseDaemonSetForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) var dsList appsv1.DaemonSetList @@ -200,7 +218,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("alluxio", "source-ns", common.AlluxioRuntime) Expect(err).NotTo(HaveOccurred()) - err = copyFuseDaemonSetForRefDataset(fakeClient, refDataset, runtimeInfo) + err = copyFuseDaemonSetForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) }) }) @@ -220,10 +238,27 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("alluxio", "source-ns", common.AlluxioRuntime) Expect(err).NotTo(HaveOccurred()) - err = copyFuseDaemonSetForRefDataset(fakeClient, refDataset, runtimeInfo) + err = copyFuseDaemonSetForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).To(HaveOccurred()) }) }) + + Context("when caller context is canceled", func() { + It("should return the context error", func() { + fakeClient = contextAwareClient{Client: fake.NewFakeClientWithScheme(testScheme, testObjs...)} + + refDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "ref-dataset", Namespace: "ref-ns", UID: types.UID("test-uid")}, + } + runtimeInfo, err := base.BuildRuntimeInfo("alluxio", "source-ns", common.AlluxioRuntime) + Expect(err).NotTo(HaveOccurred()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = copyFuseDaemonSetForRefDataset(ctx, fakeClient, refDataset, runtimeInfo) + Expect(err).To(MatchError(context.Canceled)) + }) + }) }) Describe("createConfigMapForRefDataset", func() { @@ -278,7 +313,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("alluxio", "source-ns", common.AlluxioRuntime) Expect(err).NotTo(HaveOccurred()) - err = engine.createConfigMapForRefDataset(fakeClient, refDataset, runtimeInfo) + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) var cmList corev1.ConfigMapList @@ -289,6 +324,76 @@ var _ = Describe("ConfigMap Operations", func() { }) }) + Context("when caller context is canceled", func() { + It("should return the context error", func() { + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "alluxio-config", Namespace: "source-ns"}, + } + refDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "ref-dataset", Namespace: "ref-ns", UID: types.UID("test-uid")}, + } + + fakeClient = contextAwareClient{Client: fake.NewFakeClientWithScheme(testScheme, configMap)} + engine.Client = fakeClient + + runtimeInfo, err := base.BuildRuntimeInfo("alluxio", "source-ns", common.AlluxioRuntime) + Expect(err).NotTo(HaveOccurred()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = engine.createConfigMapForRefDataset(ctx, fakeClient, refDataset, runtimeInfo) + Expect(err).To(MatchError(context.Canceled)) + }) + }) + + Context("when destination configmap already exists", func() { + It("should skip without overwriting existing data", func() { + sourceConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "alluxio-config", Namespace: "source-ns"}, + Data: map[string]string{"key": "source"}, + } + existingConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "alluxio-config", Namespace: "ref-ns"}, + Data: map[string]string{"key": "existing"}, + } + refDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "ref-dataset", Namespace: "ref-ns", UID: types.UID("test-uid")}, + } + + fakeClient = fake.NewFakeClientWithScheme(testScheme, sourceConfigMap, existingConfigMap) + engine.Client = fakeClient + + runtimeInfo, err := base.BuildRuntimeInfo("alluxio", "source-ns", common.AlluxioRuntime) + Expect(err).NotTo(HaveOccurred()) + + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) + Expect(err).NotTo(HaveOccurred()) + + cm := &corev1.ConfigMap{} + err = fakeClient.Get(context.TODO(), types.NamespacedName{Name: "alluxio-config", Namespace: "ref-ns"}, cm) + Expect(err).NotTo(HaveOccurred()) + Expect(cm.Data).To(HaveKeyWithValue("key", "existing")) + }) + }) + + Context("when source configmap is missing", func() { + It("should return the existing missing source error", func() { + refDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "ref-dataset", Namespace: "ref-ns", UID: types.UID("test-uid")}, + } + + fakeClient = fake.NewFakeClientWithScheme(testScheme, testObjs...) + engine.Client = fakeClient + + runtimeInfo, err := base.BuildRuntimeInfo("alluxio", "source-ns", common.AlluxioRuntime) + Expect(err).NotTo(HaveOccurred()) + + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("runtime configmap source-ns/alluxio-config do not exist")) + }) + }) + Context("when physical runtime is JuiceFSRuntime", func() { It("should copy fuse-script configmap successfully", func() { configMap := &corev1.ConfigMap{ @@ -320,7 +425,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("juicefs", "source-ns", common.JuiceFSRuntime) Expect(err).NotTo(HaveOccurred()) - err = engine.createConfigMapForRefDataset(fakeClient, refDataset, runtimeInfo) + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) var cmList corev1.ConfigMapList @@ -358,7 +463,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("goosefs", "source-ns", common.GooseFSRuntime) Expect(err).NotTo(HaveOccurred()) - err = engine.createConfigMapForRefDataset(fakeClient, refDataset, runtimeInfo) + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) var cmList corev1.ConfigMapList @@ -403,7 +508,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("jindo", "source-ns", common.JindoRuntime) Expect(err).NotTo(HaveOccurred()) - err = engine.createConfigMapForRefDataset(fakeClient, refDataset, runtimeInfo) + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) var cmList corev1.ConfigMapList @@ -428,7 +533,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("efc", "source-ns", common.EFCRuntime) Expect(err).NotTo(HaveOccurred()) - err = engine.createConfigMapForRefDataset(fakeClient, refDataset, runtimeInfo) + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) }) }) @@ -448,7 +553,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("thin", "source-ns", common.ThinRuntime) Expect(err).NotTo(HaveOccurred()) - err = engine.createConfigMapForRefDataset(fakeClient, refDataset, runtimeInfo) + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).NotTo(HaveOccurred()) }) }) @@ -468,7 +573,7 @@ var _ = Describe("ConfigMap Operations", func() { runtimeInfo, err := base.BuildRuntimeInfo("unknown", "source-ns", "UnknownRuntime") Expect(err).NotTo(HaveOccurred()) - err = engine.createConfigMapForRefDataset(fakeClient, refDataset, runtimeInfo) + err = engine.createConfigMapForRefDataset(context.TODO(), fakeClient, refDataset, runtimeInfo) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("fail to get configmap for runtime type")) }) diff --git a/pkg/ddc/thin/referencedataset/engine.go b/pkg/ddc/thin/referencedataset/engine.go index f66ea3d97de..b98d98c6762 100644 --- a/pkg/ddc/thin/referencedataset/engine.go +++ b/pkg/ddc/thin/referencedataset/engine.go @@ -178,12 +178,12 @@ func (e *ReferenceDatasetEngine) Setup(ctx cruntime.ReconcileRequestContext) (re return false, err } - err = copyFuseDaemonSetForRefDataset(e.Client, dataset, runtimeInfo) + err = copyFuseDaemonSetForRefDataset(ctx, e.Client, dataset, runtimeInfo) if err != nil { return false, err } - err = e.createConfigMapForRefDataset(e.Client, dataset, runtimeInfo) + err = e.createConfigMapForRefDataset(ctx, e.Client, dataset, runtimeInfo) if err != nil { return false, err } diff --git a/pkg/utils/kubeclient/configmap.go b/pkg/utils/kubeclient/configmap.go index 992e38d1ae1..871afebd9cf 100644 --- a/pkg/utils/kubeclient/configmap.go +++ b/pkg/utils/kubeclient/configmap.go @@ -195,6 +195,10 @@ func CreateConfigMapWithContext(ctx context.Context, client client.Client, name } func CreateConfigMapWithOwner(client client.Client, name string, namespace string, data map[string]string, ownerReference []metav1.OwnerReference) (err error) { + return CreateConfigMapWithOwnerWithContext(context.TODO(), client, name, namespace, data, ownerReference) +} + +func CreateConfigMapWithOwnerWithContext(ctx context.Context, client client.Client, name, namespace string, data map[string]string, ownerReference []metav1.OwnerReference) (err error) { configMap := &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -204,5 +208,5 @@ func CreateConfigMapWithOwner(client client.Client, name string, namespace strin Data: data, } - return client.Create(context.TODO(), configMap) + return client.Create(ctx, configMap) } diff --git a/pkg/utils/kubeclient/configmap_test.go b/pkg/utils/kubeclient/configmap_test.go index 8b87bfcf9c7..7f1f3393fa4 100644 --- a/pkg/utils/kubeclient/configmap_test.go +++ b/pkg/utils/kubeclient/configmap_test.go @@ -527,4 +527,17 @@ var _ = Describe("ConfigMap Operations", func() { }) }) }) + + Describe("CreateConfigMapWithOwner", func() { + Context("when caller context is canceled", func() { + It("should return the context error without creating the ConfigMap", func() { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := CreateConfigMapWithOwnerWithContext(ctx, contextAwareClient{Client: testClient}, "owner-ctx-canceled", namespace, map[string]string{"key": "value"}, nil) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(context.Canceled)) + }) + }) + }) }) From 8a0c454bf767135393ff159723dd7918da1f24be Mon Sep 17 00:00:00 2001 From: CAICAIIs <3360776475@qq.com> Date: Mon, 27 Apr 2026 20:17:49 +0800 Subject: [PATCH 2/2] fix: address copilot configmap review feedback Signed-off-by: CAICAIIs <3360776475@qq.com> --- pkg/ddc/cache/engine/cm.go | 6 +++--- pkg/ddc/cache/engine/cm_test.go | 17 ++++++++++++++++- pkg/ddc/cache/engine/sync.go | 2 +- pkg/utils/kubeclient/configmap_test.go | 2 +- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/pkg/ddc/cache/engine/cm.go b/pkg/ddc/cache/engine/cm.go index 5253c64f4b6..b46bbc72fc6 100644 --- a/pkg/ddc/cache/engine/cm.go +++ b/pkg/ddc/cache/engine/cm.go @@ -90,7 +90,7 @@ func (e *CacheEngine) createRuntimeValueConfigMap(ctx context.Context, runtime * if configMap != nil { return nil } - data, err := e.generateRuntimeConfigData(runtime) + data, err := e.generateRuntimeConfigData(ctx, runtime) if err != nil { return errors.Wrap(err, "failed to generate runtime config") } @@ -99,8 +99,8 @@ func (e *CacheEngine) createRuntimeValueConfigMap(ctx context.Context, runtime * } // generateRuntimeConfigData generate the data in the config map for runtime config -func (e *CacheEngine) generateRuntimeConfigData(runtime *datav1alpha1.CacheRuntime) (map[string]string, error) { - dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) +func (e *CacheEngine) generateRuntimeConfigData(ctx context.Context, runtime *datav1alpha1.CacheRuntime) (map[string]string, error) { + dataset, err := utils.GetDatasetWithContext(ctx, e.Client, e.name, e.namespace) if err != nil { return nil, err } diff --git a/pkg/ddc/cache/engine/cm_test.go b/pkg/ddc/cache/engine/cm_test.go index 23ea7586be0..e8d7ca57ed3 100644 --- a/pkg/ddc/cache/engine/cm_test.go +++ b/pkg/ddc/cache/engine/cm_test.go @@ -116,6 +116,21 @@ func TestCreateRuntimeValueConfigMapCreatesMissingConfigMap(t *testing.T) { } } +func TestGenerateRuntimeConfigDataWithCanceledContext(t *testing.T) { + scheme := newCacheEngineTestScheme(t) + runtimeObj := newCacheRuntimeForConfigMapTest() + runtimeClass := newCacheRuntimeClassForConfigMapTest() + dataset := newDatasetForConfigMapTest() + testClient := &contextAwareCacheClient{Client: fake.NewFakeClientWithScheme(scheme, runtimeObj, runtimeClass, dataset)} + engine := &CacheEngine{Client: testClient, name: "demo", namespace: "default"} + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + if _, err := engine.generateRuntimeConfigData(ctx, runtimeObj); err != context.Canceled { + t.Fatalf("expected context.Canceled, got %v", err) + } +} + func TestSyncRuntimeValueConfigMapWithCanceledContext(t *testing.T) { scheme := newCacheEngineTestScheme(t) runtimeObj := newCacheRuntimeForConfigMapTest() @@ -139,7 +154,7 @@ func TestSyncRuntimeValueConfigMapSkipsUnchangedData(t *testing.T) { dataset := newDatasetForConfigMapTest() baseEngine := &CacheEngine{Client: fake.NewFakeClientWithScheme(scheme, runtimeObj, runtimeClass, dataset), name: "demo", namespace: "default"} - data, err := baseEngine.generateRuntimeConfigData(runtimeObj) + data, err := baseEngine.generateRuntimeConfigData(context.Background(), runtimeObj) if err != nil { t.Fatalf("failed to generate runtime config data: %v", err) } diff --git a/pkg/ddc/cache/engine/sync.go b/pkg/ddc/cache/engine/sync.go index 787cbf77065..298d246e846 100644 --- a/pkg/ddc/cache/engine/sync.go +++ b/pkg/ddc/cache/engine/sync.go @@ -58,7 +58,7 @@ func (e *CacheEngine) syncRuntimeValueConfigMap(ctx cruntime.ReconcileRequestCon if err != nil { return err } - data, err := e.generateRuntimeConfigData(runtime) + data, err := e.generateRuntimeConfigData(ctx, runtime) if err != nil { return err } diff --git a/pkg/utils/kubeclient/configmap_test.go b/pkg/utils/kubeclient/configmap_test.go index 7f1f3393fa4..9c3314fedfb 100644 --- a/pkg/utils/kubeclient/configmap_test.go +++ b/pkg/utils/kubeclient/configmap_test.go @@ -528,7 +528,7 @@ var _ = Describe("ConfigMap Operations", func() { }) }) - Describe("CreateConfigMapWithOwner", func() { + Describe("CreateConfigMapWithOwnerWithContext", func() { Context("when caller context is canceled", func() { It("should return the context error without creating the ConfigMap", func() { ctx, cancel := context.WithCancel(context.Background())