From c2689414e5df3bd0d737f4c66d7cf0b410446018 Mon Sep 17 00:00:00 2001 From: Erik Godding Boye Date: Thu, 2 Jan 2025 21:49:30 +0100 Subject: [PATCH] refactor: migrate calls to deprecated Poll* functions --- core/clustersmngr/factory.go | 6 ++--- core/fluxsync/fluxsync.go | 14 +++++------ core/server/crd.go | 2 +- core/server/crd_test.go | 7 +++--- core/server/events_test.go | 2 +- core/server/featureflags_test.go | 4 ++- core/server/fluxruntime_test.go | 14 +++++------ core/server/inventory_test.go | 10 ++++---- core/server/objects_test.go | 36 +++++++++++++-------------- core/server/policies_test.go | 4 +-- core/server/policy_violations_test.go | 6 ++--- core/server/server.go | 6 ++--- core/server/suite_test.go | 8 +++--- core/server/suspend_test.go | 2 +- core/server/sync_test.go | 2 +- core/server/version_test.go | 7 +++--- pkg/services/crd/fetcher.go | 25 ++++++++----------- pkg/services/crd/fetcher_test.go | 20 +++++++-------- pkg/services/crd/nocachefetcher.go | 12 ++++----- pkg/services/crd/suite_test.go | 4 +-- 20 files changed, 94 insertions(+), 97 deletions(-) diff --git a/core/clustersmngr/factory.go b/core/clustersmngr/factory.go index 5943649889..6e6c6c8f82 100644 --- a/core/clustersmngr/factory.go +++ b/core/clustersmngr/factory.go @@ -273,8 +273,7 @@ func (cf *clustersManager) watchClusters(ctx context.Context) { cf.initialClustersLoad <- true - //nolint:staticcheck // deprecated, tracking issue: https://github.com/weaveworks/weave-gitops/issues/3812 - if err := wait.PollImmediateInfinite(watchClustersFrequency, func() (bool, error) { + if err := wait.PollUntilContextCancel(ctx, watchClustersFrequency, true, func(ctx context.Context) (bool, error) { if err := cf.UpdateClusters(ctx); err != nil { cf.log.Error(err, "Failed to update clusters") } @@ -311,8 +310,7 @@ func (cf *clustersManager) watchNamespaces(ctx context.Context) { // waits the first load of cluster to start watching namespaces <-cf.initialClustersLoad - //nolint:staticcheck // deprecated, tracking issue: https://github.com/weaveworks/weave-gitops/issues/3812 - if err := wait.PollImmediateInfinite(watchNamespaceFrequency, func() (bool, error) { + if err := wait.PollUntilContextCancel(ctx, watchNamespaceFrequency, true, func(ctx context.Context) (bool, error) { if err := cf.UpdateNamespaces(ctx); err != nil { if merr, ok := err.(*multierror.Error); ok { for _, cerr := range merr.Errors { diff --git a/core/fluxsync/fluxsync.go b/core/fluxsync/fluxsync.go index 3195982743..57c18688b1 100644 --- a/core/fluxsync/fluxsync.go +++ b/core/fluxsync/fluxsync.go @@ -46,14 +46,14 @@ func RequestReconciliation(ctx context.Context, k client.Client, name client.Obj // WaitForSync polls the k8s API until the resources is sync'd, and times out eventually. func WaitForSync(ctx context.Context, c client.Client, key client.ObjectKey, obj Reconcilable) error { - //nolint:staticcheck // deprecated, tracking issue: https://github.com/weaveworks/weave-gitops/issues/3812 - if err := wait.PollImmediate( + if err := wait.PollUntilContextTimeout( + ctx, k8sPollInterval, k8sTimeout, - checkResourceSync(ctx, c, key, obj, obj.GetLastHandledReconcileRequest()), + true, + checkResourceSync(c, key, obj, obj.GetLastHandledReconcileRequest()), ); err != nil { - //nolint:staticcheck // deprecated, tracking issue: https://github.com/weaveworks/weave-gitops/issues/3812 - if errors.Is(err, wait.ErrWaitTimeout) { + if wait.Interrupted(err) { return errors.New("sync request timed out. The sync operation may still be in progress") } @@ -63,8 +63,8 @@ func WaitForSync(ctx context.Context, c client.Client, key client.ObjectKey, obj return nil } -func checkResourceSync(ctx context.Context, c client.Client, name client.ObjectKey, obj Reconcilable, lastReconcile string) func() (bool, error) { - return func() (bool, error) { +func checkResourceSync(c client.Client, name client.ObjectKey, obj Reconcilable, lastReconcile string) func(context.Context) (bool, error) { + return func(ctx context.Context) (bool, error) { err := c.Get(ctx, name, obj.AsClientObject()) if err != nil { return false, err diff --git a/core/server/crd.go b/core/server/crd.go index 371f5cd87b..a29d6dfe49 100644 --- a/core/server/crd.go +++ b/core/server/crd.go @@ -11,6 +11,6 @@ import ( // installed or not on that cluster. func (cs *coreServer) IsCRDAvailable(ctx context.Context, msg *pb.IsCRDAvailableRequest) (*pb.IsCRDAvailableResponse, error) { return &pb.IsCRDAvailableResponse{ - Clusters: cs.crd.IsAvailableOnClusters(msg.Name), + Clusters: cs.crd.IsAvailableOnClusters(ctx, msg.Name), }, nil } diff --git a/core/server/crd_test.go b/core/server/crd_test.go index 377a2ff932..47cdc37bc4 100644 --- a/core/server/crd_test.go +++ b/core/server/crd_test.go @@ -16,13 +16,14 @@ import ( func TestIsAvailable(t *testing.T) { g := NewGomegaWithT(t) - c := makeGRPCServer(k8sEnv.Rest, t) + + ctx := context.Background() + + c := makeGRPCServer(ctx, k8sEnv.Rest, t) scheme, err := kube.CreateScheme() g.Expect(err).NotTo(HaveOccurred()) - ctx := context.Background() - _, err = client.New(k8sEnv.Rest, client.Options{ Scheme: scheme, }) diff --git a/core/server/events_test.go b/core/server/events_test.go index 7e6916b2e6..87b52ed50e 100644 --- a/core/server/events_test.go +++ b/core/server/events_test.go @@ -23,7 +23,7 @@ func TestListEvents(t *testing.T) { ctx := context.Background() - c := makeGRPCServer(k8sEnv.Rest, t) + c := makeGRPCServer(ctx, k8sEnv.Rest, t) scheme, err := kube.CreateScheme() g.Expect(err).To(BeNil()) diff --git a/core/server/featureflags_test.go b/core/server/featureflags_test.go index e76a345eb9..25302a9887 100644 --- a/core/server/featureflags_test.go +++ b/core/server/featureflags_test.go @@ -22,6 +22,8 @@ import ( func TestGetFeatureFlags(t *testing.T) { RegisterFailHandler(Fail) + ctx := context.Background() + featureflags.Set("this is a flag", "you won't find it anywhere else") scheme, err := kube.CreateScheme() @@ -42,7 +44,7 @@ func TestGetFeatureFlags(t *testing.T) { cfg, err := server.NewCoreConfig(logr.Discard(), &rest.Config{}, "test", clustersManager, hc) Expect(err).NotTo(HaveOccurred()) - coreSrv, err := server.NewCoreServer(cfg) + coreSrv, err := server.NewCoreServer(ctx, cfg) Expect(err).NotTo(HaveOccurred()) resp, err := coreSrv.GetFeatureFlags(context.Background(), &pb.GetFeatureFlagsRequest{}) diff --git a/core/server/fluxruntime_test.go b/core/server/fluxruntime_test.go index 0783512272..890143f289 100644 --- a/core/server/fluxruntime_test.go +++ b/core/server/fluxruntime_test.go @@ -32,7 +32,7 @@ func TestGetReconciledObjects(t *testing.T) { ctx := context.Background() - c := makeGRPCServer(k8sEnv.Rest, t) + c := makeGRPCServer(ctx, k8sEnv.Rest, t) scheme, err := kube.CreateScheme() g.Expect(err).To(BeNil()) @@ -198,7 +198,7 @@ func TestGetReconciledObjectsWithSecret(t *testing.T) { ctx := context.Background() - c := makeGRPCServer(k8sEnv.Rest, t) + c := makeGRPCServer(ctx, k8sEnv.Rest, t) scheme, err := kube.CreateScheme() g.Expect(err).To(BeNil()) @@ -309,7 +309,7 @@ func TestGetChildObjects(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(&ns, deployment, rs).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetChildObjects(ctx, &pb.GetChildObjectsRequest{ ParentUid: string(deployment.UID), @@ -385,7 +385,7 @@ func TestListFluxRuntimeObjects(t *testing.T) { g.Expect(err).To(BeNil()) client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(tt.objects...).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListFluxRuntimeObjects(ctx, &pb.ListFluxRuntimeObjectsRequest{}) g.Expect(err).NotTo(HaveOccurred()) @@ -454,7 +454,7 @@ func TestListRuntimeObjects(t *testing.T) { g.Expect(err).To(BeNil()) client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(tt.objects...).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListRuntimeObjects(ctx, &pb.ListRuntimeObjectsRequest{}) @@ -521,7 +521,7 @@ func TestListFluxCrds(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(crd1, crd2).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListFluxCrds(ctx, &pb.ListFluxCrdsRequest{}) @@ -579,7 +579,7 @@ func TestListRuntimeCrds(t *testing.T) { g.Expect(err).To(BeNil()) client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(tt.objects...).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListRuntimeCrds(ctx, &pb.ListRuntimeCrdsRequest{}) g.Expect(err).NotTo(HaveOccurred()) diff --git a/core/server/inventory_test.go b/core/server/inventory_test.go index 068c5c774c..2c94a105df 100644 --- a/core/server/inventory_test.go +++ b/core/server/inventory_test.go @@ -125,7 +125,7 @@ func TestGetInventoryKustomization(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(&ns, &anotherNs, kust, deployment, rs).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetInventory(ctx, &pb.GetInventoryRequest{ Namespace: ns.Name, @@ -196,7 +196,7 @@ func TestGetBlankInventoryKustomization(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(kust, deployment).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetInventory(ctx, &pb.GetInventoryRequest{ Namespace: ns, @@ -271,7 +271,7 @@ func TestGetInventoryHelmRelease(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, helm1, secret, cm).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetInventory(ctx, &pb.GetInventoryRequest{ Namespace: ns.Name, @@ -361,7 +361,7 @@ func TestGetInventoryHelmReleaseNoNSResources(t *testing.T) { WithRESTMapper(testrestmapper.TestOnlyStaticRESTMapper(scheme)). WithRuntimeObjects(ns, helm1, secret, cm).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetInventory(ctx, &pb.GetInventoryRequest{ Namespace: ns.Name, @@ -407,7 +407,7 @@ func TestGetInventoryHelmReleaseWithKubeconfig(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, helm1).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetInventory(ctx, &pb.GetInventoryRequest{ Namespace: ns.Name, diff --git a/core/server/objects_test.go b/core/server/objects_test.go index 9405df2d2d..d3f370d783 100644 --- a/core/server/objects_test.go +++ b/core/server/objects_test.go @@ -54,7 +54,7 @@ func TestGetObject(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, kust).Build() cfg := makeServerConfig(fakeClient, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetObject(ctx, &pb.GetObjectRequest{ Name: appName, @@ -119,7 +119,7 @@ func TestGetObjectOtherKinds(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(&ns, dep).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) _, err = c.GetObject(ctx, &pb.GetObjectRequest{ Name: appName, @@ -136,7 +136,7 @@ func TestGetObjectOtherKinds(t *testing.T) { }) g.Expect(err).NotTo(HaveOccurred()) - c = makeServer(cfg, t) + c = makeServer(ctx, cfg, t) res, err := c.GetObject(ctx, &pb.GetObjectRequest{ Name: appName, @@ -192,7 +192,7 @@ func TestGetObject_HelmReleaseWithInventory(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, helm1, secret).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetObject(ctx, &pb.GetObjectRequest{ Name: helm1.Name, @@ -277,7 +277,7 @@ func TestGetObject_HelmReleaseWithCompressedInventory(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, helm1, secret).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetObject(ctx, &pb.GetObjectRequest{ Name: helm1.Name, @@ -324,7 +324,7 @@ func TestGetObject_HelmReleaseCantGetSecret(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, helm1, secret).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetObject(ctx, &pb.GetObjectRequest{ Name: helm1.Name, @@ -361,7 +361,7 @@ func TestGetObjectSecret(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, secret).Build() cfg := makeServerConfig(fakeClient, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetObject(ctx, &pb.GetObjectRequest{ Name: secret.Name, @@ -407,7 +407,7 @@ func TestListObjectSingle(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, kust).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Namespace: ns.Name, @@ -459,7 +459,7 @@ func TestListObjectMultiple(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, kust, helm1, helm2).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Namespace: ns.Name, @@ -500,7 +500,7 @@ func TestListObjectSingleWithClusterName(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, kust).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Namespace: ns.Name, @@ -553,7 +553,7 @@ func TestListObjectMultipleWithClusterName(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, kust, helm1, helm2).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Namespace: ns.Name, @@ -611,7 +611,7 @@ func TestListObject_HelmReleaseWithInventory(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, helm1, secret).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Namespace: ns.Name, @@ -673,7 +673,7 @@ func TestListObject_HelmReleaseWithInventoryHistory(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, helm1, secret).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Namespace: ns.Name, @@ -719,7 +719,7 @@ func TestListObject_HelmReleaseCantGetSecret(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, helm1, secret).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Namespace: ns.Name, @@ -755,7 +755,7 @@ func TestListObjectsSecret(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, secret).Build() cfg := makeServerConfig(fakeClient, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Kind: "Secret", @@ -808,7 +808,7 @@ func TestListObjectsLabels(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, deployment1, deployment2).Build() cfg := makeServerConfig(fakeClient, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Kind: "Deployment", @@ -877,7 +877,7 @@ func TestListObjectsGitOpsRunSessions(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, session1, session2).Build() cfg := makeServerConfig(fakeClient, t, testCluster) - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListObjects(ctx, &pb.ListObjectsRequest{ Namespace: testNS, @@ -938,7 +938,7 @@ func TestGetObjectSessionObjects(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(ns, kust, helm, bucket).Build() cfg := makeServerConfig(fakeClient, t, testCluster) - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetObject(ctx, &pb.GetObjectRequest{ Name: constants.RunDevKsName, diff --git a/core/server/policies_test.go b/core/server/policies_test.go index 48bf6286c6..b90e0f2a57 100644 --- a/core/server/policies_test.go +++ b/core/server/policies_test.go @@ -56,7 +56,7 @@ func TestListPolicies(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(policy1, policy2).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListPolicies(ctx, &pb.ListPoliciesRequest{}) @@ -95,7 +95,7 @@ func TestGetPolicy(t *testing.T) { client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(policy).Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.GetPolicy(ctx, &pb.GetPolicyRequest{ PolicyName: policy.Spec.ID, diff --git a/core/server/policy_violations_test.go b/core/server/policy_violations_test.go index f0ada9b4f1..5cf1c7ce5f 100644 --- a/core/server/policy_violations_test.go +++ b/core/server/policy_violations_test.go @@ -51,7 +51,7 @@ func TestGetViolation(t *testing.T) { Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) // existing validation res, err := c.GetPolicyValidation(ctx, &pb.GetPolicyValidationRequest{ @@ -121,7 +121,7 @@ func TestListApplicationValidations(t *testing.T) { Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListPolicyValidations(ctx, &pb.ListPolicyValidationsRequest{ Application: "app1", Kind: "HelmRelease", @@ -181,7 +181,7 @@ func TestListPolicyValidations(t *testing.T) { Build() cfg := makeServerConfig(client, t, "") - c := makeServer(cfg, t) + c := makeServer(ctx, cfg, t) res, err := c.ListPolicyValidations(ctx, &pb.ListPolicyValidationsRequest{}) g.Expect(err).NotTo(HaveOccurred()) g.Expect(err).To(BeNil()) diff --git a/core/server/server.go b/core/server/server.go index 27e76a91f4..5da1ad70e1 100644 --- a/core/server/server.go +++ b/core/server/server.go @@ -16,7 +16,7 @@ import ( ) func Hydrate(ctx context.Context, mux *runtime.ServeMux, cfg CoreServerConfig) error { - appsServer, err := NewCoreServer(cfg) + appsServer, err := NewCoreServer(ctx, cfg) if err != nil { return fmt.Errorf("unable to create new kube client: %w", err) } @@ -69,9 +69,9 @@ func NewCoreConfig(log logr.Logger, cfg *rest.Config, clusterName string, cluste }, nil } -func NewCoreServer(cfg CoreServerConfig) (pb.CoreServer, error) { +func NewCoreServer(ctx context.Context, cfg CoreServerConfig) (pb.CoreServer, error) { if cfg.CRDService == nil { - cfg.CRDService = crd.NewFetcher(cfg.log, cfg.ClustersManager) + cfg.CRDService = crd.NewFetcher(ctx, cfg.log, cfg.ClustersManager) } return &coreServer{ diff --git a/core/server/suite_test.go b/core/server/suite_test.go index efb67da9e4..50f68abf7d 100644 --- a/core/server/suite_test.go +++ b/core/server/suite_test.go @@ -55,7 +55,7 @@ func TestMain(m *testing.M) { os.Exit(code) } -func makeGRPCServer(cfg *rest.Config, t *testing.T) pb.CoreClient { +func makeGRPCServer(ctx context.Context, cfg *rest.Config, t *testing.T) pb.CoreClient { log := logr.Discard() nsChecker = nsaccessfakes.FakeChecker{} nsChecker.FilterAccessibleNamespacesStub = func(ctx context.Context, t typedauth.AuthorizationV1Interface, n []v1.Namespace) ([]v1.Namespace, error) { @@ -86,7 +86,7 @@ func makeGRPCServer(cfg *rest.Config, t *testing.T) pb.CoreClient { coreCfg.NSAccess = &nsChecker coreCfg.CRDService = crd.NewNoCacheFetcher(clustersManager) - core, err := server.NewCoreServer(coreCfg) + core, err := server.NewCoreServer(ctx, coreCfg) if err != nil { t.Fatalf("Failed to create CoreServer: %v", err) } @@ -202,8 +202,8 @@ func makeServerConfig(fakeClient client.Client, t *testing.T, clusterName string return coreCfg } -func makeServer(cfg server.CoreServerConfig, t *testing.T) pb.CoreClient { - core, err := server.NewCoreServer(cfg) +func makeServer(ctx context.Context, cfg server.CoreServerConfig, t *testing.T) pb.CoreClient { + core, err := server.NewCoreServer(ctx, cfg) if err != nil { t.Fatalf("Failed to create CoreServer: %v", err) } diff --git a/core/server/suspend_test.go b/core/server/suspend_test.go index 5b44c66ed1..5f64d26a4c 100644 --- a/core/server/suspend_test.go +++ b/core/server/suspend_test.go @@ -32,7 +32,7 @@ func TestSuspend_Suspend(t *testing.T) { }) g.Expect(err).NotTo(HaveOccurred()) - c := makeGRPCServer(k8sEnv.Rest, t) + c := makeGRPCServer(ctx, k8sEnv.Rest, t) ns := newNamespace(ctx, k, g) diff --git a/core/server/sync_test.go b/core/server/sync_test.go index 1e6e22b7fd..6dc9ab0ae9 100644 --- a/core/server/sync_test.go +++ b/core/server/sync_test.go @@ -30,7 +30,7 @@ func TestSync(t *testing.T) { ctx := context.Background() - c := makeGRPCServer(k8sEnv.Rest, t) + c := makeGRPCServer(ctx, k8sEnv.Rest, t) scheme, err := kube.CreateScheme() g.Expect(err).To(BeNil()) diff --git a/core/server/version_test.go b/core/server/version_test.go index 6c1719f318..1861eadac9 100644 --- a/core/server/version_test.go +++ b/core/server/version_test.go @@ -16,14 +16,15 @@ import ( func TestGetVersion(t *testing.T) { g := NewGomegaWithT(t) - c := makeGRPCServer(k8sEnv.Rest, t) + + ctx := context.Background() + + c := makeGRPCServer(ctx, k8sEnv.Rest, t) logf.SetLogger(logr.Discard()) scheme, err := kube.CreateScheme() g.Expect(err).To(BeNil()) - ctx := context.Background() - _, err = client.New(k8sEnv.Rest, client.Options{ Scheme: scheme, }) diff --git a/pkg/services/crd/fetcher.go b/pkg/services/crd/fetcher.go index 88eff2bb83..90010809ba 100644 --- a/pkg/services/crd/fetcher.go +++ b/pkg/services/crd/fetcher.go @@ -15,23 +15,23 @@ import ( const watchCRDsFrequency = 30 * time.Second type Fetcher interface { - IsAvailable(clusterName, name string) bool - IsAvailableOnClusters(name string) map[string]bool - UpdateCRDList() + IsAvailable(ctx context.Context, clusterName, name string) bool + IsAvailableOnClusters(ctx context.Context, name string) map[string]bool + UpdateCRDList(context.Context) } // NewFetcher creates a new default fetcher with cache. // // With NewFetcher, it will automatically start a background go routine to watch // CRDs. -func NewFetcher(logger logr.Logger, clustersManager clustersmngr.ClustersManager) Fetcher { +func NewFetcher(ctx context.Context, logger logr.Logger, clustersManager clustersmngr.ClustersManager) Fetcher { fetcher := &defaultFetcher{ logger: logger, clustersManager: clustersManager, crds: map[string][]v1.CustomResourceDefinition{}, } - go fetcher.watchCRDs() + go fetcher.watchCRDs(ctx) return fetcher } @@ -43,22 +43,19 @@ type defaultFetcher struct { crds map[string][]v1.CustomResourceDefinition } -func (s *defaultFetcher) watchCRDs() { - //nolint:staticcheck // deprecated, tracking issue: https://github.com/weaveworks/weave-gitops/issues/3812 - _ = wait.PollImmediateInfinite(watchCRDsFrequency, func() (bool, error) { - s.UpdateCRDList() +func (s *defaultFetcher) watchCRDs(ctx context.Context) { + _ = wait.PollUntilContextCancel(ctx, watchCRDsFrequency, true, func(ctx context.Context) (bool, error) { + s.UpdateCRDList(ctx) return false, nil }) } // UpdateCRDList updates the cached CRD list. -func (s *defaultFetcher) UpdateCRDList() { +func (s *defaultFetcher) UpdateCRDList(ctx context.Context) { s.Lock() defer s.Unlock() - ctx := context.Background() - client, err := s.clustersManager.GetServerClient(ctx) if err != nil { s.logger.Error(err, "unable to get client pool") @@ -83,7 +80,7 @@ func (s *defaultFetcher) UpdateCRDList() { } // IsAvailable tells if a given CRD is available on the specified cluster. -func (s *defaultFetcher) IsAvailable(clusterName, name string) bool { +func (s *defaultFetcher) IsAvailable(_ context.Context, clusterName, name string) bool { s.Lock() defer s.Unlock() @@ -97,7 +94,7 @@ func (s *defaultFetcher) IsAvailable(clusterName, name string) bool { } // IsAvailableOnClusters tells the availability of a given CRD on all clusters. -func (s *defaultFetcher) IsAvailableOnClusters(name string) map[string]bool { +func (s *defaultFetcher) IsAvailableOnClusters(_ context.Context, name string) map[string]bool { result := map[string]bool{} s.Lock() diff --git a/pkg/services/crd/fetcher_test.go b/pkg/services/crd/fetcher_test.go index 2ef6776e70..de45580775 100644 --- a/pkg/services/crd/fetcher_test.go +++ b/pkg/services/crd/fetcher_test.go @@ -18,7 +18,7 @@ func TestFetcher_IsAvailable(t *testing.T) { defer cancelFn() - service, err := newService(k8sEnv) + service, err := newService(ctx, k8sEnv) g.Expect(err).NotTo(gomega.HaveOccurred()) k, err := client.New(k8sEnv.Rest, client.Options{ @@ -31,7 +31,7 @@ func TestFetcher_IsAvailable(t *testing.T) { var found bool - found = service.IsAvailable(defaultClusterName, "customobjects.example.com") + found = service.IsAvailable(ctx, defaultClusterName, "customobjects.example.com") g.Expect(found).To(gomega.BeFalse(), "customobjects crd should not be defined in %s cluster", defaultClusterName) newCRD(ctx, g, k, @@ -42,15 +42,15 @@ func TestFetcher_IsAvailable(t *testing.T) { Kind: "CustomObject", }) - service.UpdateCRDList() + service.UpdateCRDList(ctx) - found = service.IsAvailable(defaultClusterName, "customobjects.example.com") + found = service.IsAvailable(ctx, defaultClusterName, "customobjects.example.com") g.Expect(found).To(gomega.BeTrue(), "customobjects crd should be defined in %s cluster", defaultClusterName) - found = service.IsAvailable(defaultClusterName, "somethingelse.example.com") + found = service.IsAvailable(ctx, defaultClusterName, "somethingelse.example.com") g.Expect(found).To(gomega.BeFalse(), "somethingelse crd should not be defined in %s Cluster", defaultClusterName) - found = service.IsAvailable("Other", "customobjects.example.com") + found = service.IsAvailable(ctx, "Other", "customobjects.example.com") g.Expect(found).To(gomega.BeFalse(), "customobjects crd should not be defined in Other cluster") } @@ -61,7 +61,7 @@ func TestFetcher_IsAvailableOnClusters(t *testing.T) { defer cancelFn() - service, err := newService(k8sEnv) + service, err := newService(ctx, k8sEnv) g.Expect(err).NotTo(gomega.HaveOccurred()) k, err := client.New(k8sEnv.Rest, client.Options{ @@ -83,9 +83,9 @@ func TestFetcher_IsAvailableOnClusters(t *testing.T) { crdName := "xclustercustomons.example.com" - service.UpdateCRDList() + service.UpdateCRDList(ctx) - response := service.IsAvailableOnClusters(crdName) + response := service.IsAvailableOnClusters(ctx, crdName) g.Expect(response).To(gomega.HaveLen(1), "cluster list should contain one entry") g.Expect(response).To(gomega.HaveKey(defaultClusterName), "cluster list should contain info about %s cluster", defaultClusterName) @@ -93,7 +93,7 @@ func TestFetcher_IsAvailableOnClusters(t *testing.T) { crdName = "xclusterothercustomons.example.com" - response = service.IsAvailableOnClusters(crdName) + response = service.IsAvailableOnClusters(ctx, crdName) g.Expect(response).To(gomega.HaveLen(1), "cluster list should contain one entry") g.Expect(response).To(gomega.HaveKey(defaultClusterName), "cluster list should contain info about %s cluster", defaultClusterName) diff --git a/pkg/services/crd/nocachefetcher.go b/pkg/services/crd/nocachefetcher.go index 973a672dd0..a4e50fa84c 100644 --- a/pkg/services/crd/nocachefetcher.go +++ b/pkg/services/crd/nocachefetcher.go @@ -30,12 +30,10 @@ type noCacheFetcher struct { } // UpdateCRDList updates the CRD list. -func (s *noCacheFetcher) UpdateCRDList() { +func (s *noCacheFetcher) UpdateCRDList(ctx context.Context) { s.Lock() defer s.Unlock() - ctx := context.Background() - client, err := s.clustersManager.GetServerClient(ctx) if err != nil { s.logger.Error(err, "unable to get client pool") @@ -61,8 +59,8 @@ func (s *noCacheFetcher) UpdateCRDList() { // IsAvailable tells if a given CRD is available on the specified cluster. // // It calls UpdateCRDList always. -func (s *noCacheFetcher) IsAvailable(clusterName, name string) bool { - s.UpdateCRDList() +func (s *noCacheFetcher) IsAvailable(ctx context.Context, clusterName, name string) bool { + s.UpdateCRDList(ctx) s.Lock() defer s.Unlock() @@ -79,8 +77,8 @@ func (s *noCacheFetcher) IsAvailable(clusterName, name string) bool { // IsAvailableOnClusters tells the availability of a given CRD on all clusters. // // It calls UpdateCRDList always. -func (s *noCacheFetcher) IsAvailableOnClusters(name string) map[string]bool { - s.UpdateCRDList() +func (s *noCacheFetcher) IsAvailableOnClusters(ctx context.Context, name string) map[string]bool { + s.UpdateCRDList(ctx) s.Lock() defer s.Unlock() diff --git a/pkg/services/crd/suite_test.go b/pkg/services/crd/suite_test.go index cdceff22bb..0513ef2d74 100644 --- a/pkg/services/crd/suite_test.go +++ b/pkg/services/crd/suite_test.go @@ -43,7 +43,7 @@ func TestMain(m *testing.M) { os.Exit(code) } -func newService(k8sEnv *testutils.K8sTestEnv) (crd.Fetcher, error) { +func newService(ctx context.Context, k8sEnv *testutils.K8sTestEnv) (crd.Fetcher, error) { _, clustersManager, err := createClient(k8sEnv) if err != nil { return nil, err @@ -51,7 +51,7 @@ func newService(k8sEnv *testutils.K8sTestEnv) (crd.Fetcher, error) { log := logr.Discard() - return crd.NewFetcher(log, clustersManager), nil + return crd.NewFetcher(ctx, log, clustersManager), nil } func createClient(k8sEnv *testutils.K8sTestEnv) (clustersmngr.Client, clustersmngr.ClustersManager, error) {