From 53c6ab9d969e1041250bcce92ad3b7008e755719 Mon Sep 17 00:00:00 2001 From: "Benjamin A. Petersen" Date: Tue, 1 Oct 2019 22:54:28 -0400 Subject: [PATCH] Split SyncService() func into separate NewServiceSyncController() from main operator control loop --- pkg/console/controllers/service/controller.go | 162 ++++++++++++++++++ pkg/console/operator/sync_v400.go | 22 +-- pkg/console/starter/starter.go | 16 +- 3 files changed, 178 insertions(+), 22 deletions(-) create mode 100644 pkg/console/controllers/service/controller.go diff --git a/pkg/console/controllers/service/controller.go b/pkg/console/controllers/service/controller.go new file mode 100644 index 0000000000..f95664847e --- /dev/null +++ b/pkg/console/controllers/service/controller.go @@ -0,0 +1,162 @@ +package service + +import ( + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coreinformersv1 "k8s.io/client-go/informers/core/v1" + coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" + + operatorsv1 "github.com/openshift/api/operator/v1" + operatorclientv1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1" + "github.com/openshift/console-operator/pkg/api" + "github.com/openshift/console-operator/pkg/console/status" + "github.com/openshift/console-operator/pkg/console/subresource/service" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/resource/resourceapply" +) + +const ( + // key is basically irrelevant + controllerWorkQueueKey = "service-sync-work-queue-key" + controllerName = "ConsoleServiceSyncController" +) + +// ctrl just needs the clients so it can make requests +// the informers will automatically notify it of changes +// and kick the sync loop +type ServiceSyncController struct { + operatorConfigClient operatorclientv1.ConsoleInterface + // live clients, we dont need listers w/caches + serviceClient coreclientv1.ServicesGetter + // names + targetNamespace string + serviceName string + // events + cachesToSync []cache.InformerSynced + queue workqueue.RateLimitingInterface + recorder events.Recorder +} + +// factory func needs clients and informers +// informers to start them up, clients to pass +func NewServiceSyncController( + operatorConfigClient operatorclientv1.ConsoleInterface, + corev1Client coreclientv1.CoreV1Interface, + serviceInformer coreinformersv1.ServiceInformer, + // names + targetNamespace string, + serviceName string, + // events + recorder events.Recorder, +) *ServiceSyncController { + + corev1Client.Services(targetNamespace) + + ctrl := &ServiceSyncController{ + operatorConfigClient: operatorConfigClient, + serviceClient: corev1Client, + // names + targetNamespace: targetNamespace, + serviceName: serviceName, + // events + recorder: recorder, + cachesToSync: nil, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName), + } + + serviceInformer.Informer().AddEventHandler(ctrl.newEventHandler()) + ctrl.cachesToSync = append(ctrl.cachesToSync, + serviceInformer.Informer().HasSynced, + ) + + return ctrl +} + +func (c *ServiceSyncController) sync() error { + startTime := time.Now() + klog.V(4).Infof("started syncing service %q (%v)", c.serviceName, startTime) + defer klog.V(4).Infof("finished syncing service %q (%v)", c.serviceName, time.Since(startTime)) + + operatorConfig, err := c.operatorConfigClient.Get(api.ConfigResourceName, metav1.GetOptions{}) + if err != nil { + return err + } + + switch operatorConfig.Spec.ManagementState { + case operatorsv1.Managed: + klog.V(4).Infoln("console is in a managed state: syncing service") + case operatorsv1.Unmanaged: + klog.V(4).Infoln("console is in an unmanaged state: skipping service sync") + return nil + case operatorsv1.Removed: + klog.V(4).Infoln("console is in a removed state: deleting service") + return c.removeService() + default: + return fmt.Errorf("unknown state: %v", operatorConfig.Spec.ManagementState) + } + + updatedOperatorConfig := operatorConfig.DeepCopy() + _, _, svcErr := resourceapply.ApplyService(c.serviceClient, c.recorder, service.DefaultService(nil)) + status.HandleProgressingOrDegraded(updatedOperatorConfig, "ServiceSync", "FailedApply", svcErr) + status.SyncStatus(c.operatorConfigClient, updatedOperatorConfig) + + return svcErr +} + +func (c *ServiceSyncController) removeService() error { + klog.V(2).Info("deleting console service") + defer klog.V(2).Info("finished deleting console service") + return c.serviceClient.Services(c.targetNamespace).Delete(service.Stub().Name, &metav1.DeleteOptions{}) +} + +// boilerplate, since this controller is not making use of monis.app/go boilerplate +func (c *ServiceSyncController) Run(workers int, stopCh <-chan struct{}) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + klog.Infof("starting %v", controllerName) + defer klog.Infof("shutting down %v", controllerName) + if !cache.WaitForCacheSync(stopCh, c.cachesToSync...) { + klog.Infoln("caches did not sync") + runtime.HandleError(fmt.Errorf("caches did not sync")) + return + } + // only start one worker + go wait.Until(c.runWorker, time.Second, stopCh) + <-stopCh +} + +func (c *ServiceSyncController) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *ServiceSyncController) processNextWorkItem() bool { + processKey, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(processKey) + err := c.sync() + if err == nil { + c.queue.Forget(processKey) + return true + } + runtime.HandleError(fmt.Errorf("%v failed with : %v", processKey, err)) + c.queue.AddRateLimited(processKey) + return true +} + +func (c *ServiceSyncController) newEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { c.queue.Add(controllerWorkQueueKey) }, + UpdateFunc: func(old, new interface{}) { c.queue.Add(controllerWorkQueueKey) }, + DeleteFunc: func(obj interface{}) { c.queue.Add(controllerWorkQueueKey) }, + } +} diff --git a/pkg/console/operator/sync_v400.go b/pkg/console/operator/sync_v400.go index 862da7d055..47355e6126 100644 --- a/pkg/console/operator/sync_v400.go +++ b/pkg/console/operator/sync_v400.go @@ -35,7 +35,6 @@ import ( oauthsub "github.com/openshift/console-operator/pkg/console/subresource/oauthclient" routesub "github.com/openshift/console-operator/pkg/console/subresource/route" secretsub "github.com/openshift/console-operator/pkg/console/subresource/secret" - servicesub "github.com/openshift/console-operator/pkg/console/subresource/service" ) var ( @@ -69,13 +68,6 @@ func (co *consoleOperator) sync_v400(updatedOperatorConfig *operatorv1.Console, return rtErr } - svc, svcChanged, svcErrReason, svcErr := co.SyncService(set.Operator) - toUpdate = toUpdate || svcChanged - status.HandleProgressingOrDegraded(updatedOperatorConfig, "ServiceSync", svcErrReason, svcErr) - if svcErr != nil { - return svcErr - } - cm, cmChanged, cmErrReason, cmErr := co.SyncConfigMap(set.Operator, set.Console, set.Infrastructure, rt) toUpdate = toUpdate || cmChanged status.HandleProgressingOrDegraded(updatedOperatorConfig, "ConfigMapSync", cmErrReason, cmErr) @@ -186,9 +178,7 @@ func (co *consoleOperator) sync_v400(updatedOperatorConfig *operatorv1.Console, defer func() { klog.V(4).Infof("sync loop 4.0.0 complete") - if svcChanged { - klog.V(4).Infof("\t service changed: %v", svc.GetResourceVersion()) - } + if rtChanged { klog.V(4).Infof("\t route changed: %v", rt.GetResourceVersion()) } @@ -396,16 +386,6 @@ func (co *consoleOperator) SyncTrustedCAConfigMap(operatorConfig *operatorv1.Con return actual, true, "", err } -// apply service -// there is nothing special about our service, so no additional error handling is needed here. -func (co *consoleOperator) SyncService(operatorConfig *operatorv1.Console) (consoleService *corev1.Service, changed bool, reason string, err error) { - svc, svcChanged, svcErr := resourceapply.ApplyService(co.serviceClient, co.recorder, servicesub.DefaultService(operatorConfig)) - if svcErr != nil { - return nil, false, "FailedApply", svcErr - } - return svc, svcChanged, "", svcErr -} - // apply route // - be sure to test that we don't trigger an infinite loop by stomping on the // default host name set by the server, or any other values. The ApplyRoute() diff --git a/pkg/console/starter/starter.go b/pkg/console/starter/starter.go index 075ead090a..84b48fef4c 100644 --- a/pkg/console/starter/starter.go +++ b/pkg/console/starter/starter.go @@ -45,6 +45,7 @@ import ( consoleinformers "github.com/openshift/client-go/console/informers/externalversions" "github.com/openshift/console-operator/pkg/console/clientwrapper" + "github.com/openshift/console-operator/pkg/console/controllers/service" "github.com/openshift/console-operator/pkg/console/operator" "github.com/openshift/library-go/pkg/operator/loglevel" ) @@ -172,7 +173,6 @@ func RunOperator(ctx *controllercmd.ControllerContext) error { recorder, resourceSyncer, ) - cliDownloadsController := clidownloads.NewCLIDownloadsSyncController( // clients operatorClient, @@ -188,6 +188,19 @@ func RunOperator(ctx *controllercmd.ControllerContext) error { recorder, ) + consoleServiceController := service.NewServiceSyncController( + // operator config so we can update status + operatorConfigClient.OperatorV1().Consoles(), + // only needs to interact with the service resource + kubeClient.CoreV1(), + kubeInformersNamespaced.Core().V1().Services(), + // names + api.OpenShiftConsoleNamespace, + api.OpenShiftConsoleName, + // events + recorder, + ) + versionRecorder := status.NewVersionGetter() versionRecorder.SetVersion("operator", os.Getenv("RELEASE_VERSION")) @@ -246,6 +259,7 @@ func RunOperator(ctx *controllercmd.ControllerContext) error { informer.Start(ctx.Done()) } + go consoleServiceController.Run(1, ctx.Done()) go consoleOperator.Run(ctx.Done()) go resourceSyncer.Run(1, ctx.Done()) go clusterOperatorStatus.Run(1, ctx.Done())