Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions pkg/console/controllers/service/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package service

import (
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

operatorsv1 "github.com/openshift/api/operator/v1"
operatorclientv1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1"
"github.com/openshift/console-operator/pkg/api"
"github.com/openshift/console-operator/pkg/console/status"
"github.com/openshift/console-operator/pkg/console/subresource/service"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
)

const (
// key is basically irrelevant
controllerWorkQueueKey = "service-sync-work-queue-key"
controllerName = "ConsoleServiceSyncController"
)

// ctrl just needs the clients so it can make requests
// the informers will automatically notify it of changes
// and kick the sync loop
type ServiceSyncController struct {
operatorConfigClient operatorclientv1.ConsoleInterface
// live clients, we dont need listers w/caches
serviceClient coreclientv1.ServicesGetter
// names
targetNamespace string
serviceName string
// events
cachesToSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
recorder events.Recorder
}

// factory func needs clients and informers
// informers to start them up, clients to pass
func NewServiceSyncController(
operatorConfigClient operatorclientv1.ConsoleInterface,
corev1Client coreclientv1.CoreV1Interface,
serviceInformer coreinformersv1.ServiceInformer,
// names
targetNamespace string,
serviceName string,
// events
recorder events.Recorder,
) *ServiceSyncController {

corev1Client.Services(targetNamespace)

ctrl := &ServiceSyncController{
operatorConfigClient: operatorConfigClient,
serviceClient: corev1Client,
// names
targetNamespace: targetNamespace,
serviceName: serviceName,
// events
recorder: recorder,
cachesToSync: nil,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),
}

serviceInformer.Informer().AddEventHandler(ctrl.newEventHandler())
ctrl.cachesToSync = append(ctrl.cachesToSync,
serviceInformer.Informer().HasSynced,
)

return ctrl
}

func (c *ServiceSyncController) sync() error {
startTime := time.Now()
klog.V(4).Infof("started syncing service %q (%v)", c.serviceName, startTime)
defer klog.V(4).Infof("finished syncing service %q (%v)", c.serviceName, time.Since(startTime))

operatorConfig, err := c.operatorConfigClient.Get(api.ConfigResourceName, metav1.GetOptions{})
if err != nil {
return err
}

switch operatorConfig.Spec.ManagementState {
case operatorsv1.Managed:
klog.V(4).Infoln("console is in a managed state: syncing service")
case operatorsv1.Unmanaged:
klog.V(4).Infoln("console is in an unmanaged state: skipping service sync")
return nil
case operatorsv1.Removed:
klog.V(4).Infoln("console is in a removed state: deleting service")
return c.removeService()
default:
return fmt.Errorf("unknown state: %v", operatorConfig.Spec.ManagementState)
}

updatedOperatorConfig := operatorConfig.DeepCopy()
_, _, svcErr := resourceapply.ApplyService(c.serviceClient, c.recorder, service.DefaultService(nil))
status.HandleProgressingOrDegraded(updatedOperatorConfig, "ServiceSync", "FailedApply", svcErr)
status.SyncStatus(c.operatorConfigClient, updatedOperatorConfig)

return svcErr
}

func (c *ServiceSyncController) removeService() error {
klog.V(2).Info("deleting console service")
defer klog.V(2).Info("finished deleting console service")
return c.serviceClient.Services(c.targetNamespace).Delete(service.Stub().Name, &metav1.DeleteOptions{})
}

// boilerplate, since this controller is not making use of monis.app/go boilerplate
func (c *ServiceSyncController) Run(workers int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("starting %v", controllerName)
defer klog.Infof("shutting down %v", controllerName)
if !cache.WaitForCacheSync(stopCh, c.cachesToSync...) {
klog.Infoln("caches did not sync")
runtime.HandleError(fmt.Errorf("caches did not sync"))
return
}
// only start one worker
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
}

func (c *ServiceSyncController) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *ServiceSyncController) processNextWorkItem() bool {
processKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(processKey)
err := c.sync()
if err == nil {
c.queue.Forget(processKey)
return true
}
runtime.HandleError(fmt.Errorf("%v failed with : %v", processKey, err))
c.queue.AddRateLimited(processKey)
return true
}

func (c *ServiceSyncController) newEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(controllerWorkQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(controllerWorkQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(controllerWorkQueueKey) },
}
}
22 changes: 1 addition & 21 deletions pkg/console/operator/sync_v400.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
oauthsub "github.com/openshift/console-operator/pkg/console/subresource/oauthclient"
routesub "github.com/openshift/console-operator/pkg/console/subresource/route"
secretsub "github.com/openshift/console-operator/pkg/console/subresource/secret"
servicesub "github.com/openshift/console-operator/pkg/console/subresource/service"
)

var (
Expand Down Expand Up @@ -69,13 +68,6 @@ func (co *consoleOperator) sync_v400(updatedOperatorConfig *operatorv1.Console,
return rtErr
}

svc, svcChanged, svcErrReason, svcErr := co.SyncService(set.Operator)
toUpdate = toUpdate || svcChanged
status.HandleProgressingOrDegraded(updatedOperatorConfig, "ServiceSync", svcErrReason, svcErr)
if svcErr != nil {
return svcErr
}

cm, cmChanged, cmErrReason, cmErr := co.SyncConfigMap(set.Operator, set.Console, set.Infrastructure, rt)
toUpdate = toUpdate || cmChanged
status.HandleProgressingOrDegraded(updatedOperatorConfig, "ConfigMapSync", cmErrReason, cmErr)
Expand Down Expand Up @@ -186,9 +178,7 @@ func (co *consoleOperator) sync_v400(updatedOperatorConfig *operatorv1.Console,

defer func() {
klog.V(4).Infof("sync loop 4.0.0 complete")
if svcChanged {
klog.V(4).Infof("\t service changed: %v", svc.GetResourceVersion())
}

if rtChanged {
klog.V(4).Infof("\t route changed: %v", rt.GetResourceVersion())
}
Expand Down Expand Up @@ -396,16 +386,6 @@ func (co *consoleOperator) SyncTrustedCAConfigMap(operatorConfig *operatorv1.Con
return actual, true, "", err
}

// apply service
// there is nothing special about our service, so no additional error handling is needed here.
func (co *consoleOperator) SyncService(operatorConfig *operatorv1.Console) (consoleService *corev1.Service, changed bool, reason string, err error) {
svc, svcChanged, svcErr := resourceapply.ApplyService(co.serviceClient, co.recorder, servicesub.DefaultService(operatorConfig))
if svcErr != nil {
return nil, false, "FailedApply", svcErr
}
return svc, svcChanged, "", svcErr
}

// apply route
// - be sure to test that we don't trigger an infinite loop by stomping on the
// default host name set by the server, or any other values. The ApplyRoute()
Expand Down
16 changes: 15 additions & 1 deletion pkg/console/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
consoleinformers "github.com/openshift/client-go/console/informers/externalversions"

"github.com/openshift/console-operator/pkg/console/clientwrapper"
"github.com/openshift/console-operator/pkg/console/controllers/service"
"github.com/openshift/console-operator/pkg/console/operator"
"github.com/openshift/library-go/pkg/operator/loglevel"
)
Expand Down Expand Up @@ -172,7 +173,6 @@ func RunOperator(ctx *controllercmd.ControllerContext) error {
recorder,
resourceSyncer,
)

cliDownloadsController := clidownloads.NewCLIDownloadsSyncController(
// clients
operatorClient,
Expand All @@ -188,6 +188,19 @@ func RunOperator(ctx *controllercmd.ControllerContext) error {
recorder,
)

consoleServiceController := service.NewServiceSyncController(
// operator config so we can update status
operatorConfigClient.OperatorV1().Consoles(),
// only needs to interact with the service resource
kubeClient.CoreV1(),
kubeInformersNamespaced.Core().V1().Services(),
// names
api.OpenShiftConsoleNamespace,
api.OpenShiftConsoleName,
// events
recorder,
)

versionRecorder := status.NewVersionGetter()
versionRecorder.SetVersion("operator", os.Getenv("RELEASE_VERSION"))

Expand Down Expand Up @@ -246,6 +259,7 @@ func RunOperator(ctx *controllercmd.ControllerContext) error {
informer.Start(ctx.Done())
}

go consoleServiceController.Run(1, ctx.Done())
go consoleOperator.Run(ctx.Done())
go resourceSyncer.Run(1, ctx.Done())
go clusterOperatorStatus.Run(1, ctx.Done())
Expand Down