From 1d4c8c6cc5ecc639075a9637e55cb6d242983500 Mon Sep 17 00:00:00 2001 From: nolancon Date: Wed, 21 May 2025 10:25:18 +0000 Subject: [PATCH 1/2] Revert "Introduce pagination in the List Buckets request done via the uncached k8s api client (#320)" This reverts commit 93b272d05135f769c88d0a9aae99f908455f8e08. --- .../healthcheck/healthcheck_controller.go | 32 ++++--------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/internal/controller/providerconfig/healthcheck/healthcheck_controller.go b/internal/controller/providerconfig/healthcheck/healthcheck_controller.go index b1f0e70d..353703d2 100644 --- a/internal/controller/providerconfig/healthcheck/healthcheck_controller.go +++ b/internal/controller/providerconfig/healthcheck/healthcheck_controller.go @@ -182,11 +182,10 @@ func (c *Controller) doHealthCheck(ctx context.Context, providerConfig *apisv1al // by unsetting the Pause label. func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) { const ( - steps = 4 - duration = time.Second - factor = 5 - jitter = 0.1 - bucketsPerRequest = 50 + steps = 4 + duration = time.Second + factor = 5 + jitter = 0.1 ) ctx, log := traces.InjectTraceAndLogger(ctx, c.log) @@ -204,28 +203,9 @@ func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) { Factor: factor, Jitter: jitter, }, resource.IsAPIError, func() error { - listOptions := &client.ListOptions{ + return c.kubeClientUncached.List(ctx, buckets, &client.ListOptions{ LabelSelector: listLabels, - Limit: bucketsPerRequest, - } - for { - pageBuckets := &v1alpha1.BucketList{} - if err := c.kubeClientUncached.List(ctx, pageBuckets, listOptions); err != nil { - return err - } - - buckets.Items = append(buckets.Items, pageBuckets.Items...) - - if pageBuckets.Continue == "" { - break - } - - listOptions.Continue = pageBuckets.Continue - - continue - } - - return nil + }) }) if err != nil { log.Info("Error attempting to list Buckets on backend", "error", err.Error(), consts.KeyBackendName, s3BackendName) From 48fc08b8017ea70cc1ee47adb4ba72e8d100950b Mon Sep 17 00:00:00 2001 From: nolancon Date: Thu, 22 May 2025 09:59:46 +0000 Subject: [PATCH 2/2] Add cached reader for list bucekts in healthcheck ctrlr --- cmd/provider/main.go | 61 ++++++++++++++----- .../providerconfig/healthcheck/healthcheck.go | 44 ++++++------- .../healthcheck/healthcheck_controller.go | 10 +-- .../healthcheck_controller_test.go | 4 +- 4 files changed, 74 insertions(+), 45 deletions(-) diff --git a/cmd/provider/main.go b/cmd/provider/main.go index 57a5d794..810ab54d 100644 --- a/cmd/provider/main.go +++ b/cmd/provider/main.go @@ -20,6 +20,7 @@ package main import ( "context" + "errors" "flag" "net/http" "os" @@ -27,19 +28,21 @@ import ( "time" "github.com/alecthomas/kingpin/v2" + "github.com/go-logr/logr" "github.com/linode/provider-ceph/internal/otel/traces" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/zap/zapcore" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" - kcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -74,6 +77,10 @@ var defaultZapConfig = map[string]string{ "zap-time-encoding": "rfc3339nano", } +var ( + errFailedToSyncCache = errors.New("failed to sync kube client cache reader") +) + //nolint:maintidx // Function requires a lot of setup operations. func main() { var ( @@ -82,7 +89,6 @@ func main() { leaderRenew = app.Flag("leader-renew", "Set leader election renewal.").Short('r').Default("10s").OverrideDefaultFromEnvar("LEADER_ELECTION_RENEW").Duration() syncInterval = app.Flag("sync", "How often all resources will be double-checked for drift from the desired state.").Short('s').Default("1h").Duration() - syncTimeout = app.Flag("sync-timeout", "Cache sync timeout.").Default("10s").Duration() backendMonitorInterval = app.Flag("backend-monitor-interval", "Interval between backend monitor controller reconciliations.").Default("60s").Duration() pollInterval = app.Flag("poll", "How often individual resources will be checked for drift from the desired state").Short('p').Default("30m").Duration() pollStateMetricInterval = app.Flag("poll-state-metric", "State metric recording interval").Default("5s").Duration() @@ -96,7 +102,9 @@ func main() { tracesExportInterval = app.Flag("otel-traces-export-interval", "Interval at which traces are exported").Default("5s").Duration() tracesExportAddress = app.Flag("otel-traces-export-address", "Address of otel collector").Default("opentelemetry-collector.opentelemetry:4317").String() - kubeClientRate = app.Flag("kube-client-rate", "The global maximum rate per second at how many requests the client can do.").Default("1000").Int() + kubeClientRate = app.Flag("kube-client-rate", "The global maximum rate per second at how many requests the client can do.").Default("1000").Int() + kubeClientTimeout = app.Flag("kube-client-timeout", "Kube client request timeout.").Default("10s").Duration() + kubeCacheSyncTimeout = app.Flag("cache-sync-timeout", "Kube client cache sync timeout.").Default("60s").Duration() namespace = app.Flag("namespace", "Namespace used to set as default scope in default secret store config.").Default("crossplane-system").Envar("POD_NAMESPACE").String() enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool() @@ -227,7 +235,7 @@ func main() { kingpin.FatalIfError(err, "Cannot create HTTP client") httpClient.Transport = otelhttp.NewTransport(httpClient.Transport) - httpClient.Timeout = *syncTimeout + httpClient.Timeout = *kubeClientTimeout mm := managed.NewMRMetricRecorder() sm := statemetrics.NewMRStateMetrics() @@ -254,18 +262,18 @@ func main() { CertDir: *webhookTLSCertDir, }), Scheme: providerScheme, - Cache: kcache.Options{ + Cache: cache.Options{ HTTPClient: httpClient, SyncPeriod: syncInterval, Scheme: providerScheme, - ByObject: map[client.Object]kcache.ByObject{ + ByObject: map[client.Object]cache.ByObject{ &providercephv1alpha1.Bucket{}: { Label: labels.NewSelector().Add(*pausedSelector), }, &v1alpha1.ProviderConfig{}: {}, }, }, - NewCache: kcache.New, + NewCache: cache.New, }) kingpin.FatalIfError(err, "Cannot create controller manager") @@ -304,13 +312,9 @@ func main() { backendStore := backendstore.NewBackendStore() - kubeClientUncached, err := client.New(cfg, client.Options{ - Scheme: providerScheme, - HTTPClient: &http.Client{ - Transport: httpClient.Transport, - }, - }) - kingpin.FatalIfError(err, "Cannot create Kube client") + // Create a cached reader for use in the health check controller when performing List Buckets. + cachedReader, err := newCachedReader(cfg, providerScheme, *kubeCacheSyncTimeout, log) + kingpin.FatalIfError(err, "Cannot setup cached reader") kingpin.FatalIfError(ctrl.NewWebhookManagedBy(mgr). For(&providercephv1alpha1.Bucket{}). @@ -327,8 +331,8 @@ func main() { healthcheck.NewController( healthcheck.WithAutoPause(autoPauseBucket), healthcheck.WithBackendStore(backendStore), - healthcheck.WithKubeClientUncached(kubeClientUncached), - healthcheck.WithKubeClientCached(mgr.GetClient()), + healthcheck.WithCachedReader(cachedReader), + healthcheck.WithKubeClient(mgr.GetClient()), healthcheck.WithHttpClient(&http.Client{Timeout: *s3Timeout}), healthcheck.WithLogger(log))), "Cannot setup ProviderConfig controllers") @@ -367,3 +371,28 @@ func main() { kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager") } + +func newCachedReader(cfg *rest.Config, s *runtime.Scheme, syncTimeout time.Duration, l logr.Logger) (client.Reader, error) { + informerCache, err := cache.New(cfg, cache.Options{ + Scheme: s, + }) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), syncTimeout) + defer cancel() + + go func() { + if err := informerCache.Start(ctx); err != nil { + l.Error(err, "failed to start informer for cached reader") + } + }() + + if !informerCache.WaitForCacheSync(ctx) { + return nil, errFailedToSyncCache + } + var cachedReader client.Reader = informerCache + + return cachedReader, nil +} diff --git a/internal/controller/providerconfig/healthcheck/healthcheck.go b/internal/controller/providerconfig/healthcheck/healthcheck.go index 264ff6a1..d7e0c38d 100644 --- a/internal/controller/providerconfig/healthcheck/healthcheck.go +++ b/internal/controller/providerconfig/healthcheck/healthcheck.go @@ -15,12 +15,12 @@ import ( const controllerName = "health-check-controller" type Controller struct { - kubeClientUncached client.Client - kubeClientCached client.Client - backendStore *backendstore.BackendStore - httpClient *http.Client - log logr.Logger - autoPauseBucket bool + kubeClient client.Client + cachedReader client.Reader + backendStore *backendstore.BackendStore + httpClient *http.Client + log logr.Logger + autoPauseBucket bool } func NewController(options ...func(*Controller)) *Controller { @@ -32,43 +32,43 @@ func NewController(options ...func(*Controller)) *Controller { return r } -func WithKubeClientUncached(k client.Client) func(*Controller) { - return func(r *Controller) { - r.kubeClientUncached = k +func WithCachedReader(r client.Reader) func(*Controller) { + return func(c *Controller) { + c.cachedReader = r } } -func WithKubeClientCached(k client.Client) func(*Controller) { - return func(r *Controller) { - r.kubeClientCached = k +func WithKubeClient(k client.Client) func(*Controller) { + return func(c *Controller) { + c.kubeClient = k } } func WithLogger(l logr.Logger) func(*Controller) { - return func(r *Controller) { - r.log = l.WithValues(apisv1alpha1.ProviderConfigGroupKind, providerconfig.ControllerName(controllerName)) + return func(c *Controller) { + c.log = l.WithValues(apisv1alpha1.ProviderConfigGroupKind, providerconfig.ControllerName(controllerName)) } } func WithBackendStore(b *backendstore.BackendStore) func(*Controller) { - return func(r *Controller) { - r.backendStore = b + return func(c *Controller) { + c.backendStore = b } } func WithAutoPause(autoPause *bool) func(*Controller) { - return func(r *Controller) { - r.autoPauseBucket = *autoPause + return func(c *Controller) { + c.autoPauseBucket = *autoPause } } func WithHttpClient(httpClient *http.Client) func(*Controller) { - return func(r *Controller) { - r.httpClient = httpClient + return func(c *Controller) { + c.httpClient = httpClient } } -func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { +func (c *Controller) SetupWithManager(mgr ctrl.Manager) error { const maxReconciles = 5 return ctrl.NewControllerManagedBy(mgr). @@ -77,5 +77,5 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { WithOptions(controller.Options{ MaxConcurrentReconciles: maxReconciles, }.ForControllerRuntime()). - Complete(r) + Complete(c) } diff --git a/internal/controller/providerconfig/healthcheck/healthcheck_controller.go b/internal/controller/providerconfig/healthcheck/healthcheck_controller.go index 353703d2..e8495b01 100644 --- a/internal/controller/providerconfig/healthcheck/healthcheck_controller.go +++ b/internal/controller/providerconfig/healthcheck/healthcheck_controller.go @@ -61,7 +61,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu bucketName := req.Name + healthCheckSuffix providerConfig := &apisv1alpha1.ProviderConfig{} - if err := c.kubeClientCached.Get(ctx, req.NamespacedName, providerConfig); err != nil { + if err := c.kubeClient.Get(ctx, req.NamespacedName, providerConfig); err != nil { if kerrors.IsNotFound(err) { // ProviderConfig has been deleted so there is nothing to do and no need to requeue. // The backend monitor controller will remove the backend from the backend store. @@ -79,7 +79,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } - if err := UpdateProviderConfigStatus(ctx, c.kubeClientCached, providerConfig, func(_, pcLatest *apisv1alpha1.ProviderConfig) { + if err := UpdateProviderConfigStatus(ctx, c.kubeClient, providerConfig, func(_, pcLatest *apisv1alpha1.ProviderConfig) { pcLatest.Status.SetConditions(v1alpha1.HealthCheckDisabled()) }); err != nil { err = errors.Wrap(err, errUpdateHealthStatus) @@ -106,7 +106,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return } - if err := UpdateProviderConfigStatus(ctx, c.kubeClientCached, providerConfig, func(pcDeepCopy, pcLatest *apisv1alpha1.ProviderConfig) { + if err := UpdateProviderConfigStatus(ctx, c.kubeClient, providerConfig, func(pcDeepCopy, pcLatest *apisv1alpha1.ProviderConfig) { pcLatest.Status.SetConditions(pcDeepCopy.Status.Conditions...) }); err != nil { err = errors.Wrap(err, errUpdateHealthStatus) @@ -203,7 +203,7 @@ func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) { Factor: factor, Jitter: jitter, }, resource.IsAPIError, func() error { - return c.kubeClientUncached.List(ctx, buckets, &client.ListOptions{ + return c.cachedReader.List(ctx, buckets, &client.ListOptions{ LabelSelector: listLabels, }) }) @@ -225,7 +225,7 @@ func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) { buckets.Items[i].Labels[meta.AnnotationKeyReconciliationPaused] == True { buckets.Items[i].Labels[meta.AnnotationKeyReconciliationPaused] = "" - return c.kubeClientCached.Update(ctx, &buckets.Items[i]) + return c.kubeClient.Update(ctx, &buckets.Items[i]) } return nil diff --git a/internal/controller/providerconfig/healthcheck/healthcheck_controller_test.go b/internal/controller/providerconfig/healthcheck/healthcheck_controller_test.go index 33c2b6c8..82b5e304 100644 --- a/internal/controller/providerconfig/healthcheck/healthcheck_controller_test.go +++ b/internal/controller/providerconfig/healthcheck/healthcheck_controller_test.go @@ -508,8 +508,8 @@ func TestReconcile(t *testing.T) { r := NewController( WithAutoPause(&tc.fields.autopause), WithBackendStore(bs), - WithKubeClientUncached(c), - WithKubeClientCached(c), + WithKubeClient(c), + WithCachedReader(c), WithHttpClient(tc.fields.testHttpClient), WithLogger(logr.Discard()))