Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,29 @@ package main

import (
"context"
"errors"
"flag"
"net/http"
"os"
"path/filepath"
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/go-logr/logr"
"github.com/linode/provider-ceph/internal/otel/traces"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap/zapcore"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
kcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -74,6 +77,10 @@ var defaultZapConfig = map[string]string{
"zap-time-encoding": "rfc3339nano",
}

var (
errFailedToSyncCache = errors.New("failed to sync kube client cache reader")
)

//nolint:maintidx // Function requires a lot of setup operations.
func main() {
var (
Expand All @@ -82,7 +89,6 @@ func main() {
leaderRenew = app.Flag("leader-renew", "Set leader election renewal.").Short('r').Default("10s").OverrideDefaultFromEnvar("LEADER_ELECTION_RENEW").Duration()

syncInterval = app.Flag("sync", "How often all resources will be double-checked for drift from the desired state.").Short('s').Default("1h").Duration()
syncTimeout = app.Flag("sync-timeout", "Cache sync timeout.").Default("10s").Duration()
backendMonitorInterval = app.Flag("backend-monitor-interval", "Interval between backend monitor controller reconciliations.").Default("60s").Duration()
pollInterval = app.Flag("poll", "How often individual resources will be checked for drift from the desired state").Short('p').Default("30m").Duration()
pollStateMetricInterval = app.Flag("poll-state-metric", "State metric recording interval").Default("5s").Duration()
Expand All @@ -96,7 +102,9 @@ func main() {
tracesExportInterval = app.Flag("otel-traces-export-interval", "Interval at which traces are exported").Default("5s").Duration()
tracesExportAddress = app.Flag("otel-traces-export-address", "Address of otel collector").Default("opentelemetry-collector.opentelemetry:4317").String()

kubeClientRate = app.Flag("kube-client-rate", "The global maximum rate per second at how many requests the client can do.").Default("1000").Int()
kubeClientRate = app.Flag("kube-client-rate", "The global maximum rate per second at how many requests the client can do.").Default("1000").Int()
kubeClientTimeout = app.Flag("kube-client-timeout", "Kube client request timeout.").Default("10s").Duration()
kubeCacheSyncTimeout = app.Flag("cache-sync-timeout", "Kube client cache sync timeout.").Default("60s").Duration()

namespace = app.Flag("namespace", "Namespace used to set as default scope in default secret store config.").Default("crossplane-system").Envar("POD_NAMESPACE").String()
enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool()
Expand Down Expand Up @@ -227,7 +235,7 @@ func main() {
kingpin.FatalIfError(err, "Cannot create HTTP client")

httpClient.Transport = otelhttp.NewTransport(httpClient.Transport)
httpClient.Timeout = *syncTimeout
httpClient.Timeout = *kubeClientTimeout

mm := managed.NewMRMetricRecorder()
sm := statemetrics.NewMRStateMetrics()
Expand All @@ -254,18 +262,18 @@ func main() {
CertDir: *webhookTLSCertDir,
}),
Scheme: providerScheme,
Cache: kcache.Options{
Cache: cache.Options{
HTTPClient: httpClient,
SyncPeriod: syncInterval,
Scheme: providerScheme,
ByObject: map[client.Object]kcache.ByObject{
ByObject: map[client.Object]cache.ByObject{
&providercephv1alpha1.Bucket{}: {
Label: labels.NewSelector().Add(*pausedSelector),
},
&v1alpha1.ProviderConfig{}: {},
},
},
NewCache: kcache.New,
NewCache: cache.New,
})
kingpin.FatalIfError(err, "Cannot create controller manager")

Expand Down Expand Up @@ -304,13 +312,9 @@ func main() {

backendStore := backendstore.NewBackendStore()

kubeClientUncached, err := client.New(cfg, client.Options{
Scheme: providerScheme,
HTTPClient: &http.Client{
Transport: httpClient.Transport,
},
})
kingpin.FatalIfError(err, "Cannot create Kube client")
// Create a cached reader for use in the health check controller when performing List Buckets.
cachedReader, err := newCachedReader(cfg, providerScheme, *kubeCacheSyncTimeout, log)
kingpin.FatalIfError(err, "Cannot setup cached reader")

kingpin.FatalIfError(ctrl.NewWebhookManagedBy(mgr).
For(&providercephv1alpha1.Bucket{}).
Expand All @@ -327,8 +331,8 @@ func main() {
healthcheck.NewController(
healthcheck.WithAutoPause(autoPauseBucket),
healthcheck.WithBackendStore(backendStore),
healthcheck.WithKubeClientUncached(kubeClientUncached),
healthcheck.WithKubeClientCached(mgr.GetClient()),
healthcheck.WithCachedReader(cachedReader),
healthcheck.WithKubeClient(mgr.GetClient()),
healthcheck.WithHttpClient(&http.Client{Timeout: *s3Timeout}),
healthcheck.WithLogger(log))),
"Cannot setup ProviderConfig controllers")
Expand Down Expand Up @@ -367,3 +371,28 @@ func main() {

kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")
}

func newCachedReader(cfg *rest.Config, s *runtime.Scheme, syncTimeout time.Duration, l logr.Logger) (client.Reader, error) {
informerCache, err := cache.New(cfg, cache.Options{
Scheme: s,
})
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), syncTimeout)
defer cancel()

go func() {
if err := informerCache.Start(ctx); err != nil {
l.Error(err, "failed to start informer for cached reader")
}
}()

if !informerCache.WaitForCacheSync(ctx) {
return nil, errFailedToSyncCache
}
var cachedReader client.Reader = informerCache

return cachedReader, nil
}
44 changes: 22 additions & 22 deletions internal/controller/providerconfig/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
const controllerName = "health-check-controller"

type Controller struct {
kubeClientUncached client.Client
kubeClientCached client.Client
backendStore *backendstore.BackendStore
httpClient *http.Client
log logr.Logger
autoPauseBucket bool
kubeClient client.Client
cachedReader client.Reader
backendStore *backendstore.BackendStore
httpClient *http.Client
log logr.Logger
autoPauseBucket bool
}

func NewController(options ...func(*Controller)) *Controller {
Expand All @@ -32,43 +32,43 @@ func NewController(options ...func(*Controller)) *Controller {
return r
}

func WithKubeClientUncached(k client.Client) func(*Controller) {
return func(r *Controller) {
r.kubeClientUncached = k
func WithCachedReader(r client.Reader) func(*Controller) {
return func(c *Controller) {
c.cachedReader = r
}
}

func WithKubeClientCached(k client.Client) func(*Controller) {
return func(r *Controller) {
r.kubeClientCached = k
func WithKubeClient(k client.Client) func(*Controller) {
return func(c *Controller) {
c.kubeClient = k
}
}

func WithLogger(l logr.Logger) func(*Controller) {
return func(r *Controller) {
r.log = l.WithValues(apisv1alpha1.ProviderConfigGroupKind, providerconfig.ControllerName(controllerName))
return func(c *Controller) {
c.log = l.WithValues(apisv1alpha1.ProviderConfigGroupKind, providerconfig.ControllerName(controllerName))
}
}

func WithBackendStore(b *backendstore.BackendStore) func(*Controller) {
return func(r *Controller) {
r.backendStore = b
return func(c *Controller) {
c.backendStore = b
}
}

func WithAutoPause(autoPause *bool) func(*Controller) {
return func(r *Controller) {
r.autoPauseBucket = *autoPause
return func(c *Controller) {
c.autoPauseBucket = *autoPause
}
}

func WithHttpClient(httpClient *http.Client) func(*Controller) {
return func(r *Controller) {
r.httpClient = httpClient
return func(c *Controller) {
c.httpClient = httpClient
}
}

func (r *Controller) SetupWithManager(mgr ctrl.Manager) error {
func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
const maxReconciles = 5

return ctrl.NewControllerManagedBy(mgr).
Expand All @@ -77,5 +77,5 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error {
WithOptions(controller.Options{
MaxConcurrentReconciles: maxReconciles,
}.ForControllerRuntime()).
Complete(r)
Complete(c)
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
bucketName := req.Name + healthCheckSuffix

providerConfig := &apisv1alpha1.ProviderConfig{}
if err := c.kubeClientCached.Get(ctx, req.NamespacedName, providerConfig); err != nil {
if err := c.kubeClient.Get(ctx, req.NamespacedName, providerConfig); err != nil {
if kerrors.IsNotFound(err) {
// ProviderConfig has been deleted so there is nothing to do and no need to requeue.
// The backend monitor controller will remove the backend from the backend store.
Expand All @@ -79,7 +79,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, nil
}

if err := UpdateProviderConfigStatus(ctx, c.kubeClientCached, providerConfig, func(_, pcLatest *apisv1alpha1.ProviderConfig) {
if err := UpdateProviderConfigStatus(ctx, c.kubeClient, providerConfig, func(_, pcLatest *apisv1alpha1.ProviderConfig) {
pcLatest.Status.SetConditions(v1alpha1.HealthCheckDisabled())
}); err != nil {
err = errors.Wrap(err, errUpdateHealthStatus)
Expand All @@ -106,7 +106,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return
}

if err := UpdateProviderConfigStatus(ctx, c.kubeClientCached, providerConfig, func(pcDeepCopy, pcLatest *apisv1alpha1.ProviderConfig) {
if err := UpdateProviderConfigStatus(ctx, c.kubeClient, providerConfig, func(pcDeepCopy, pcLatest *apisv1alpha1.ProviderConfig) {
pcLatest.Status.SetConditions(pcDeepCopy.Status.Conditions...)
}); err != nil {
err = errors.Wrap(err, errUpdateHealthStatus)
Expand Down Expand Up @@ -182,11 +182,10 @@ func (c *Controller) doHealthCheck(ctx context.Context, providerConfig *apisv1al
// by unsetting the Pause label.
func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) {
const (
steps = 4
duration = time.Second
factor = 5
jitter = 0.1
bucketsPerRequest = 50
steps = 4
duration = time.Second
factor = 5
jitter = 0.1
)
ctx, log := traces.InjectTraceAndLogger(ctx, c.log)

Expand All @@ -204,28 +203,9 @@ func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) {
Factor: factor,
Jitter: jitter,
}, resource.IsAPIError, func() error {
listOptions := &client.ListOptions{
return c.cachedReader.List(ctx, buckets, &client.ListOptions{
LabelSelector: listLabels,
Limit: bucketsPerRequest,
}
for {
pageBuckets := &v1alpha1.BucketList{}
if err := c.kubeClientUncached.List(ctx, pageBuckets, listOptions); err != nil {
return err
}

buckets.Items = append(buckets.Items, pageBuckets.Items...)

if pageBuckets.Continue == "" {
break
}

listOptions.Continue = pageBuckets.Continue

continue
}

return nil
})
})
if err != nil {
log.Info("Error attempting to list Buckets on backend", "error", err.Error(), consts.KeyBackendName, s3BackendName)
Expand All @@ -245,7 +225,7 @@ func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) {
buckets.Items[i].Labels[meta.AnnotationKeyReconciliationPaused] == True {
buckets.Items[i].Labels[meta.AnnotationKeyReconciliationPaused] = ""

return c.kubeClientCached.Update(ctx, &buckets.Items[i])
return c.kubeClient.Update(ctx, &buckets.Items[i])
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,8 @@ func TestReconcile(t *testing.T) {
r := NewController(
WithAutoPause(&tc.fields.autopause),
WithBackendStore(bs),
WithKubeClientUncached(c),
WithKubeClientCached(c),
WithKubeClient(c),
WithCachedReader(c),
WithHttpClient(tc.fields.testHttpClient),
WithLogger(logr.Discard()))

Expand Down
Loading