From 97cbb95addc6c5067d88fd7a067cec5cd186f053 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Fri, 31 Aug 2018 16:04:51 -0700 Subject: [PATCH 1/7] Refactor bus controller --- pkg/controller/bus/controller.go | 158 ++++++++++++++++--------------- 1 file changed, 83 insertions(+), 75 deletions(-) diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index 88cae16d3b4..b62d70c037a 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -309,46 +309,20 @@ func (c *Controller) syncHandler(key string) error { return err } - var dispatcherService *corev1.Service - var dispatcherDeployment, provisionerDeployment *appsv1.Deployment - var dispatcherServiceErr, dispatcherDeplErr, provisionerDeplError error - // Sync Service derived from the Bus - dispatcherService, dispatcherServiceErr = c.syncBusDispatcherService(bus) - if dispatcherServiceErr != nil { - _ = c.updateBusStatus(bus, - dispatcherService, dispatcherServiceErr, - dispatcherDeployment, dispatcherDeplErr, - provisionerDeployment, provisionerDeplError) - return dispatcherServiceErr + err = c.syncDispatcherServiceBusStatus(bus) + if err != nil { + return err } // Sync Deployment derived from the Bus - dispatcherDeployment, dispatcherDeplErr = c.syncBusDispatcherDeployment(bus) - if dispatcherDeplErr != nil { - _ = c.updateBusStatus(bus, - dispatcherService, dispatcherServiceErr, - dispatcherDeployment, dispatcherDeplErr, - provisionerDeployment, provisionerDeplError) - return dispatcherDeplErr + err = c.syncDispatcherDeploymentBusStatus(bus) + if err != nil { + return err } // Sync Deployment derived from the Bus - provisionerDeployment, provisionerDeplError = c.syncBusProvisionerDeployment(bus) - if provisionerDeplError != nil { - _ = c.updateBusStatus(bus, - dispatcherService, dispatcherServiceErr, - dispatcherDeployment, dispatcherDeplErr, - provisionerDeployment, provisionerDeplError) - return provisionerDeplError - } - - // Finally, we update the status block of the Bus resource to reflect the - // current state of the world - err = c.updateBusStatus(bus, - dispatcherService, dispatcherServiceErr, - dispatcherDeployment, dispatcherDeplErr, - provisionerDeployment, provisionerDeplError) + err = c.syncProvisionerDeploymentBusStatus(bus) if err != nil { return err } @@ -357,7 +331,79 @@ func (c *Controller) syncHandler(key string) error { return nil } -func (c *Controller) syncBusDispatcherService(bus *channelsv1alpha1.Bus) (*corev1.Service, error) { +func (c *Controller) syncDispatcherServiceBusStatus(bus *channelsv1alpha1.Bus) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + busCopy := bus.DeepCopy() + + dispatcherService, err := c.syncDispatcherService(busCopy) + + if err != nil { + busCopy.Status.Service = nil + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) + util.SetBusCondition(&busCopy.Status, *serviceCondition) + // Ignore error in updating bus status + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") + util.SetBusCondition(&busCopy.Status, *serviceCondition) + + return c.compareAndUpdateBusStatus(bus, busCopy) +} + +func (c *Controller) syncDispatcherDeploymentBusStatus(bus *channelsv1alpha1.Bus) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + busCopy := bus.DeepCopy() + + _, err := c.syncDispatcherDeployment(busCopy) + + if err != nil { + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&busCopy.Status, *dispatchCondition) + // Ignore error in updating bus status + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&busCopy.Status, *dispatchCondition) + + return c.compareAndUpdateBusStatus(bus, busCopy) +} + +func (c *Controller) syncProvisionerDeploymentBusStatus(bus *channelsv1alpha1.Bus) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + busCopy := bus.DeepCopy() + + provisionerDeployment, err := c.syncProvisionerDeployment(busCopy) + + if err != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&busCopy.Status, *provisionCondition) + // Ignore error in updating bus status + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + if provisionerDeployment != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&busCopy.Status, *provisionCondition) + } else { + util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning) + } + + return c.compareAndUpdateBusStatus(bus, busCopy) +} + +func (c *Controller) syncDispatcherService(bus *channelsv1alpha1.Bus) (*corev1.Service, error) { // Get the service with the specified service name serviceName := controller.BusDispatcherServiceName(bus.Name, bus.Namespace) service, err := c.servicesLister.Services(system.Namespace).Get(serviceName) @@ -384,7 +430,7 @@ func (c *Controller) syncBusDispatcherService(bus *channelsv1alpha1.Bus) (*corev return service, nil } -func (c *Controller) syncBusDispatcherDeployment(bus *channelsv1alpha1.Bus) (*appsv1.Deployment, error) { +func (c *Controller) syncDispatcherDeployment(bus *channelsv1alpha1.Bus) (*appsv1.Deployment, error) { // Get the deployment with the specified deployment name deploymentName := controller.BusDispatcherDeploymentName(bus.Name, bus.Namespace) deployment, err := c.deploymentsLister.Deployments(system.Namespace).Get(deploymentName) @@ -423,7 +469,7 @@ func (c *Controller) syncBusDispatcherDeployment(bus *channelsv1alpha1.Bus) (*ap return deployment, nil } -func (c *Controller) syncBusProvisionerDeployment(bus *channelsv1alpha1.Bus) (*appsv1.Deployment, error) { +func (c *Controller) syncProvisionerDeployment(bus *channelsv1alpha1.Bus) (*appsv1.Deployment, error) { provisioner := bus.Spec.Provisioner // Get the deployment with the specified deployment name @@ -477,45 +523,7 @@ func (c *Controller) syncBusProvisionerDeployment(bus *channelsv1alpha1.Bus) (*a return deployment, nil } -func (c *Controller) updateBusStatus( - bus *channelsv1alpha1.Bus, - dispatcherService *corev1.Service, dispatcherServiceErr error, - dispatcherDeployment *appsv1.Deployment, dispatcherDeploymentErr error, - provisionerDeployment *appsv1.Deployment, provisionerDeploymentErr error, -) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - busCopy := bus.DeepCopy() - - if dispatcherService != nil { - busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&busCopy.Status, *serviceCondition) - } else { - busCopy.Status.Service = nil - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, dispatcherServiceErr.Error()) - util.SetBusCondition(&busCopy.Status, *serviceCondition) - } - - if dispatcherDeployment != nil { - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&busCopy.Status, *dispatchCondition) - } else { - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, dispatcherDeploymentErr.Error()) - util.SetBusCondition(&busCopy.Status, *dispatchCondition) - } - - if provisionerDeployment != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&busCopy.Status, *provisionCondition) - } else if provisionerDeploymentErr != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, provisionerDeploymentErr.Error()) - util.SetBusCondition(&busCopy.Status, *provisionCondition) - } else { - util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning) - } - +func (c *Controller) compareAndUpdateBusStatus(bus *channelsv1alpha1.Bus, busCopy *channelsv1alpha1.Bus) error { util.ConsolidateBusCondition(busCopy) // Only update if status has changed From d187be092879881915cf6eff149eaf60f59b9666 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Tue, 4 Sep 2018 12:41:57 -0700 Subject: [PATCH 2/7] Add ClusterBus Status --- pkg/apis/channels/v1alpha1/bus_types.go | 5 + .../channels/v1alpha1/clusterbus_types.go | 9 +- .../v1alpha1/zz_generated.deepcopy.go | 18 +-- pkg/controller/clusterbus/controller.go | 111 ++++++++++++++---- pkg/controller/util/bus_util.go | 12 +- 5 files changed, 108 insertions(+), 47 deletions(-) diff --git a/pkg/apis/channels/v1alpha1/bus_types.go b/pkg/apis/channels/v1alpha1/bus_types.go index 821310f4124..85e80600588 100644 --- a/pkg/apis/channels/v1alpha1/bus_types.go +++ b/pkg/apis/channels/v1alpha1/bus_types.go @@ -142,6 +142,10 @@ func (b *Bus) GetSpec() *BusSpec { return &b.Spec } +func (b *Bus) GetStatus() *BusStatus { + return &b.Status +} + func (b *Bus) GetSpecJSON() ([]byte, error) { return json.Marshal(b.Spec) } @@ -161,6 +165,7 @@ type GenericBus interface { meta_v1.ObjectMetaAccessor BacksChannel(channel *Channel) bool GetSpec() *BusSpec + GetStatus() *BusStatus // Needed for generic webhook support apis.Defaultable diff --git a/pkg/apis/channels/v1alpha1/clusterbus_types.go b/pkg/apis/channels/v1alpha1/clusterbus_types.go index a9a6cbdf9f9..4d891400983 100644 --- a/pkg/apis/channels/v1alpha1/clusterbus_types.go +++ b/pkg/apis/channels/v1alpha1/clusterbus_types.go @@ -50,17 +50,20 @@ var _ webhook.GenericCRD = (*ClusterBus)(nil) type ClusterBusSpec = BusSpec // ClusterBusStatus (computed) for a clusterbus -type ClusterBusStatus struct { -} +type ClusterBusStatus = BusStatus func (b *ClusterBus) BacksChannel(channel *Channel) bool { return len(b.Namespace) == 0 && b.Name == channel.Spec.ClusterBus } -func (b *ClusterBus) GetSpec() *BusSpec { +func (b *ClusterBus) GetSpec() *ClusterBusSpec { return &b.Spec } +func (b *ClusterBus) GetStatus() *ClusterBusStatus { + return &b.Status +} + func (b *ClusterBus) GetSpecJSON() ([]byte, error) { return json.Marshal(b.Spec) } diff --git a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go index e876226a49a..3f815bbcd0a 100644 --- a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go @@ -403,7 +403,7 @@ func (in *ClusterBus) DeepCopyInto(out *ClusterBus) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } @@ -458,22 +458,6 @@ func (in *ClusterBusList) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ClusterBusStatus) DeepCopyInto(out *ClusterBusStatus) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterBusStatus. -func (in *ClusterBusStatus) DeepCopy() *ClusterBusStatus { - if in == nil { - return nil - } - out := new(ClusterBusStatus) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Parameter) DeepCopyInto(out *Parameter) { *out = *in diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index 8d0fe2cba34..ff696da2de4 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -48,6 +48,7 @@ import ( channelscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/pkg/client/informers/externalversions" listers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + "github.com/knative/eventing/pkg/controller/util" "github.com/knative/eventing/pkg/system" sharedclientset "github.com/knative/pkg/client/clientset/versioned" sharedinformers "github.com/knative/pkg/client/informers/externalversions" @@ -73,6 +74,17 @@ const ( MessageResourceSynced = "ClusterBus synced successfully" ) +const ( + // ServiceSynced is used as part of the condition reason when the bus (k8s) service is successfully created. + ServiceSynced = "ServiceSynced" + // ServiceError is used as part of the condition reason when the bus (k8s) service creation failed. + ServiceError = "ServiceError" + // DeploymentSynced is used as part of the condition reason when a bus deployment is successfully created. + DeploymentSynced = "DeploymentSynced" + // DeploymentError is used as part of the condition reason when a bus deployment creation failed. + DeploymentError = "DeploymentError" +) + // Controller is the controller implementation for ClusterBus resources type Controller struct { // kubeclientset is a standard kubernetes clientset @@ -286,35 +298,100 @@ func (c *Controller) syncHandler(key string) error { } // Sync Service derived from the ClusterBus - dispatcherService, err := c.syncClusterBusDispatcherService(clusterBus) + err = c.syncDispatcherServiceClusterBusStatus(clusterBus) if err != nil { return err } // Sync Deployment derived from the ClusterBus - dispatcherDeployment, err := c.syncClusterBusDispatcherDeployment(clusterBus) + err = c.syncDispatcherDeploymentClusterBusStatus(clusterBus) if err != nil { return err } // Sync Deployment derived from the ClusterBus - provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBus) + err = c.syncProvisionerDeploymentClusterBusStatus(clusterBus) if err != nil { return err } - // Finally, we update the status block of the ClusterBus resource to reflect the - // current state of the world - err = c.updateClusterBusStatus(clusterBus, dispatcherService, dispatcherDeployment, provisionerDeployment) + c.recorder.Event(clusterBus, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) + return nil +} + +func (c *Controller) syncDispatcherServiceClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + clusterBusCopy := clusterBus.DeepCopy() + + dispatcherService, err := c.syncDispatcherService(clusterBusCopy) + if err != nil { + clusterBusCopy.Status.Service = nil + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) + // Ignore error in updating bus status + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } - c.recorder.Event(clusterBus, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) - return nil + clusterBusCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) + + return c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) } -func (c *Controller) syncClusterBusDispatcherService(clusterBus *channelsv1alpha1.ClusterBus) (*corev1.Service, error) { +func (c *Controller) syncDispatcherDeploymentClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + clusterBusCopy := clusterBus.DeepCopy() + + _, err := c.syncDispatcherDeployment(clusterBusCopy) + + if err != nil { + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) + // Ignore error in updating bus status + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) + + return c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) +} + +func (c *Controller) syncProvisionerDeploymentClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + clusterBusCopy := clusterBus.DeepCopy() + + provisionerDeployment, err := c.syncProvisionerDeployment(clusterBusCopy) + + if err != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) + // Ignore error in updating cluster bus status + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + if provisionerDeployment != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) + } else { + util.RemoveBusCondition(&clusterBusCopy.Status, channelsv1alpha1.BusProvisioning) + } + + return c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) +} + +func (c *Controller) syncDispatcherService(clusterBus *channelsv1alpha1.ClusterBus) (*corev1.Service, error) { // Get the service with the specified service name serviceName := controller.ClusterBusDispatcherServiceName(clusterBus.ObjectMeta.Name) service, err := c.servicesLister.Services(system.Namespace).Get(serviceName) @@ -341,7 +418,7 @@ func (c *Controller) syncClusterBusDispatcherService(clusterBus *channelsv1alpha return service, nil } -func (c *Controller) syncClusterBusDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { +func (c *Controller) syncDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { // Get the deployment with the specified deployment name deploymentName := controller.ClusterBusDispatcherDeploymentName(clusterBus.ObjectMeta.Name) deployment, err := c.deploymentsLister.Deployments(system.Namespace).Get(deploymentName) @@ -380,7 +457,7 @@ func (c *Controller) syncClusterBusDispatcherDeployment(clusterBus *channelsv1al return deployment, nil } -func (c *Controller) syncClusterBusProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { +func (c *Controller) syncProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { provisioner := clusterBus.Spec.Provisioner // Get the deployment with the specified deployment name @@ -434,16 +511,8 @@ func (c *Controller) syncClusterBusProvisionerDeployment(clusterBus *channelsv1a return deployment, nil } -func (c *Controller) updateClusterBusStatus( - clusterBus *channelsv1alpha1.ClusterBus, - dispatcherService *corev1.Service, - dispatcherDeployment *appsv1.Deployment, - provisionerDeployment *appsv1.Deployment, -) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - clusterBusCopy := clusterBus.DeepCopy() +func (c *Controller) compareAndUpdateBusStatus(clusterBus *channelsv1alpha1.ClusterBus, clusterBusCopy *channelsv1alpha1.ClusterBus) error { + util.ConsolidateBusCondition(clusterBusCopy) // Only update if status has changed if !equality.Semantic.DeepEqual(clusterBus.Status, clusterBusCopy.Status) { // If the CustomResourceSubresources feature gate is not enabled, diff --git a/pkg/controller/util/bus_util.go b/pkg/controller/util/bus_util.go index 39b9272a713..0b6f72b9dbd 100644 --- a/pkg/controller/util/bus_util.go +++ b/pkg/controller/util/bus_util.go @@ -68,11 +68,11 @@ func RemoveBusCondition(status *v1alpha1.BusStatus, condType v1alpha1.BusConditi // ConsolidateBusCondition computes and sets the overall "Ready" condition of the bus // given all other sub-conditions. -func ConsolidateBusCondition(bus *v1alpha1.Bus) { - dispatching := GetBusCondition(bus.Status, v1alpha1.BusDispatching) - provisioning := GetBusCondition(bus.Status, v1alpha1.BusProvisioning) - serviceable := GetBusCondition(bus.Status, v1alpha1.BusServiceable) - needsProvitioner := bus.Spec.Provisioner != nil +func ConsolidateBusCondition(bus v1alpha1.GenericBus) { + dispatching := GetBusCondition(*bus.GetStatus(), v1alpha1.BusDispatching) + provisioning := GetBusCondition(*bus.GetStatus(), v1alpha1.BusProvisioning) + serviceable := GetBusCondition(*bus.GetStatus(), v1alpha1.BusServiceable) + needsProvitioner := bus.GetSpec().Provisioner != nil var cond *v1alpha1.BusCondition @@ -83,7 +83,7 @@ func ConsolidateBusCondition(bus *v1alpha1.Bus) { } else { cond = NewBusCondition(v1alpha1.BusReady, v1.ConditionFalse, "", "") } - SetBusCondition(&bus.Status, *cond) + SetBusCondition(bus.GetStatus(), *cond) } // IsBusReady returns whether all readiness conditions of a bus are met, as a boolean. From 136306c70c786d8fbbcababc4883a89f8a37ae6f Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Tue, 4 Sep 2018 19:24:27 -0700 Subject: [PATCH 3/7] Address review comments --- pkg/controller/bus/controller.go | 70 ++++++++++++------------- pkg/controller/clusterbus/controller.go | 69 ++++++++++++------------ 2 files changed, 66 insertions(+), 73 deletions(-) diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index b62d70c037a..3c9772312f5 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -309,20 +309,35 @@ func (c *Controller) syncHandler(key string) error { return err } + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + busCopy := bus.DeepCopy() + // Sync Service derived from the Bus - err = c.syncDispatcherServiceBusStatus(bus) + err = c.syncDispatcherServiceBusStatus(busCopy) if err != nil { + c.compareAndUpdateBusStatus(bus, busCopy) return err } // Sync Deployment derived from the Bus - err = c.syncDispatcherDeploymentBusStatus(bus) + err = c.syncDispatcherDeploymentBusStatus(busCopy) if err != nil { + c.compareAndUpdateBusStatus(bus, busCopy) return err } // Sync Deployment derived from the Bus - err = c.syncProvisionerDeploymentBusStatus(bus) + err = c.syncProvisionerDeploymentBusStatus(busCopy) + if err != nil { + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + // Finally, we update the status block of the Bus resource to reflect the + // current state of the world + err = c.compareAndUpdateBusStatus(bus, busCopy) if err != nil { return err } @@ -332,75 +347,56 @@ func (c *Controller) syncHandler(key string) error { } func (c *Controller) syncDispatcherServiceBusStatus(bus *channelsv1alpha1.Bus) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - busCopy := bus.DeepCopy() - dispatcherService, err := c.syncDispatcherService(busCopy) + dispatcherService, err := c.syncDispatcherService(bus) if err != nil { - busCopy.Status.Service = nil + bus.Status.Service = nil serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) - util.SetBusCondition(&busCopy.Status, *serviceCondition) - // Ignore error in updating bus status - c.compareAndUpdateBusStatus(bus, busCopy) + util.SetBusCondition(&bus.Status, *serviceCondition) return err } - busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + bus.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&busCopy.Status, *serviceCondition) + util.SetBusCondition(&bus.Status, *serviceCondition) - return c.compareAndUpdateBusStatus(bus, busCopy) + return nil } func (c *Controller) syncDispatcherDeploymentBusStatus(bus *channelsv1alpha1.Bus) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - busCopy := bus.DeepCopy() - - _, err := c.syncDispatcherDeployment(busCopy) + _, err := c.syncDispatcherDeployment(bus) if err != nil { dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&busCopy.Status, *dispatchCondition) - // Ignore error in updating bus status - c.compareAndUpdateBusStatus(bus, busCopy) + util.SetBusCondition(&bus.Status, *dispatchCondition) return err } dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&busCopy.Status, *dispatchCondition) + util.SetBusCondition(&bus.Status, *dispatchCondition) - return c.compareAndUpdateBusStatus(bus, busCopy) + return nil } func (c *Controller) syncProvisionerDeploymentBusStatus(bus *channelsv1alpha1.Bus) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - busCopy := bus.DeepCopy() - provisionerDeployment, err := c.syncProvisionerDeployment(busCopy) + provisionerDeployment, err := c.syncProvisionerDeployment(bus) if err != nil { provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&busCopy.Status, *provisionCondition) - // Ignore error in updating bus status - c.compareAndUpdateBusStatus(bus, busCopy) + util.SetBusCondition(&bus.Status, *provisionCondition) return err } if provisionerDeployment != nil { provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&busCopy.Status, *provisionCondition) + util.SetBusCondition(&bus.Status, *provisionCondition) } else { - util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning) + util.RemoveBusCondition(&bus.Status, channelsv1alpha1.BusProvisioning) } - return c.compareAndUpdateBusStatus(bus, busCopy) + return nil } func (c *Controller) syncDispatcherService(bus *channelsv1alpha1.Bus) (*corev1.Service, error) { diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index ff696da2de4..16e9d9c1cc2 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -297,20 +297,35 @@ func (c *Controller) syncHandler(key string) error { return err } + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + clusterBusCopy := clusterBus.DeepCopy() + // Sync Service derived from the ClusterBus - err = c.syncDispatcherServiceClusterBusStatus(clusterBus) + err = c.syncDispatcherServiceClusterBusStatus(clusterBusCopy) if err != nil { + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } // Sync Deployment derived from the ClusterBus - err = c.syncDispatcherDeploymentClusterBusStatus(clusterBus) + err = c.syncDispatcherDeploymentClusterBusStatus(clusterBusCopy) if err != nil { + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } // Sync Deployment derived from the ClusterBus - err = c.syncProvisionerDeploymentClusterBusStatus(clusterBus) + err = c.syncProvisionerDeploymentClusterBusStatus(clusterBusCopy) + if err != nil { + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + // Finally, we update the status block of the ClusterBus resource to reflect the + // current state of the world + err = c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) if err != nil { return err } @@ -320,75 +335,57 @@ func (c *Controller) syncHandler(key string) error { } func (c *Controller) syncDispatcherServiceClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - clusterBusCopy := clusterBus.DeepCopy() - dispatcherService, err := c.syncDispatcherService(clusterBusCopy) + dispatcherService, err := c.syncDispatcherService(clusterBus) if err != nil { - clusterBusCopy.Status.Service = nil + clusterBus.Status.Service = nil serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) - util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) - // Ignore error in updating bus status - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + util.SetBusCondition(&clusterBus.Status, *serviceCondition) return err } - clusterBusCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + clusterBus.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) + util.SetBusCondition(&clusterBus.Status, *serviceCondition) - return c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return nil } func (c *Controller) syncDispatcherDeploymentClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - clusterBusCopy := clusterBus.DeepCopy() - _, err := c.syncDispatcherDeployment(clusterBusCopy) + _, err := c.syncDispatcherDeployment(clusterBus) if err != nil { dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) - // Ignore error in updating bus status - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + util.SetBusCondition(&clusterBus.Status, *dispatchCondition) return err } dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) + util.SetBusCondition(&clusterBus.Status, *dispatchCondition) - return c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return nil } func (c *Controller) syncProvisionerDeploymentClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - clusterBusCopy := clusterBus.DeepCopy() - provisionerDeployment, err := c.syncProvisionerDeployment(clusterBusCopy) + provisionerDeployment, err := c.syncProvisionerDeployment(clusterBus) if err != nil { provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) - // Ignore error in updating cluster bus status - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + util.SetBusCondition(&clusterBus.Status, *provisionCondition) return err } if provisionerDeployment != nil { provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) + util.SetBusCondition(&clusterBus.Status, *provisionCondition) } else { - util.RemoveBusCondition(&clusterBusCopy.Status, channelsv1alpha1.BusProvisioning) + util.RemoveBusCondition(&clusterBus.Status, channelsv1alpha1.BusProvisioning) } - return c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return nil } func (c *Controller) syncDispatcherService(clusterBus *channelsv1alpha1.ClusterBus) (*corev1.Service, error) { From 013b692bdf80e30afa63625addc418bb1bd92042 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Thu, 6 Sep 2018 11:33:28 -0700 Subject: [PATCH 4/7] Remove multiple sync handlers for bus status --- pkg/controller/bus/controller.go | 142 ++++++++++------------- pkg/controller/clusterbus/controller.go | 146 ++++++++++-------------- 2 files changed, 119 insertions(+), 169 deletions(-) diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index 3c9772312f5..3d13923eaea 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -309,35 +309,8 @@ func (c *Controller) syncHandler(key string) error { return err } - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - busCopy := bus.DeepCopy() - - // Sync Service derived from the Bus - err = c.syncDispatcherServiceBusStatus(busCopy) - if err != nil { - c.compareAndUpdateBusStatus(bus, busCopy) - return err - } + err = c.updateBusStatus(bus) - // Sync Deployment derived from the Bus - err = c.syncDispatcherDeploymentBusStatus(busCopy) - if err != nil { - c.compareAndUpdateBusStatus(bus, busCopy) - return err - } - - // Sync Deployment derived from the Bus - err = c.syncProvisionerDeploymentBusStatus(busCopy) - if err != nil { - c.compareAndUpdateBusStatus(bus, busCopy) - return err - } - - // Finally, we update the status block of the Bus resource to reflect the - // current state of the world - err = c.compareAndUpdateBusStatus(bus, busCopy) if err != nil { return err } @@ -346,59 +319,6 @@ func (c *Controller) syncHandler(key string) error { return nil } -func (c *Controller) syncDispatcherServiceBusStatus(bus *channelsv1alpha1.Bus) error { - - dispatcherService, err := c.syncDispatcherService(bus) - - if err != nil { - bus.Status.Service = nil - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) - util.SetBusCondition(&bus.Status, *serviceCondition) - return err - } - - bus.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&bus.Status, *serviceCondition) - - return nil -} - -func (c *Controller) syncDispatcherDeploymentBusStatus(bus *channelsv1alpha1.Bus) error { - _, err := c.syncDispatcherDeployment(bus) - - if err != nil { - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&bus.Status, *dispatchCondition) - return err - } - - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&bus.Status, *dispatchCondition) - - return nil -} - -func (c *Controller) syncProvisionerDeploymentBusStatus(bus *channelsv1alpha1.Bus) error { - - provisionerDeployment, err := c.syncProvisionerDeployment(bus) - - if err != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&bus.Status, *provisionCondition) - return err - } - - if provisionerDeployment != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&bus.Status, *provisionCondition) - } else { - util.RemoveBusCondition(&bus.Status, channelsv1alpha1.BusProvisioning) - } - - return nil -} - func (c *Controller) syncDispatcherService(bus *channelsv1alpha1.Bus) (*corev1.Service, error) { // Get the service with the specified service name serviceName := controller.BusDispatcherServiceName(bus.Name, bus.Namespace) @@ -426,7 +346,7 @@ func (c *Controller) syncDispatcherService(bus *channelsv1alpha1.Bus) (*corev1.S return service, nil } -func (c *Controller) syncDispatcherDeployment(bus *channelsv1alpha1.Bus) (*appsv1.Deployment, error) { +func (c *Controller) syncBusDispatcherDeployment(bus *channelsv1alpha1.Bus) (*appsv1.Deployment, error) { // Get the deployment with the specified deployment name deploymentName := controller.BusDispatcherDeploymentName(bus.Name, bus.Namespace) deployment, err := c.deploymentsLister.Deployments(system.Namespace).Get(deploymentName) @@ -465,7 +385,7 @@ func (c *Controller) syncDispatcherDeployment(bus *channelsv1alpha1.Bus) (*appsv return deployment, nil } -func (c *Controller) syncProvisionerDeployment(bus *channelsv1alpha1.Bus) (*appsv1.Deployment, error) { +func (c *Controller) syncBusProvisionerDeployment(bus *channelsv1alpha1.Bus) (*appsv1.Deployment, error) { provisioner := bus.Spec.Provisioner // Get the deployment with the specified deployment name @@ -534,6 +454,62 @@ func (c *Controller) compareAndUpdateBusStatus(bus *channelsv1alpha1.Bus, busCop return nil } +func (c *Controller) updateBusStatus(bus *channelsv1alpha1.Bus) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + busCopy := bus.DeepCopy() + + // Sync Service derived from the Bus + dispatcherService, err := c.syncDispatcherService(bus) + + if err != nil { + bus.Status.Service = nil + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) + util.SetBusCondition(&bus.Status, *serviceCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + bus.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") + util.SetBusCondition(&bus.Status, *serviceCondition) + + // Sync Deployment derived from the Bus + _, err = c.syncBusDispatcherDeployment(bus) + + if err != nil { + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&bus.Status, *dispatchCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&bus.Status, *dispatchCondition) + + // Sync Deployment derived from the Bus + provisionerDeployment, err := c.syncBusProvisionerDeployment(bus) + + if err != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&bus.Status, *provisionCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + if provisionerDeployment != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&bus.Status, *provisionCondition) + } else { + util.RemoveBusCondition(&bus.Status, channelsv1alpha1.BusProvisioning) + } + + // Finally, we update the status block of the Bus resource to reflect the + // current state of the world + return c.compareAndUpdateBusStatus(bus, busCopy) +} + // enqueueBus takes a Bus resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than Bus. diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index 16e9d9c1cc2..0867f312596 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -297,35 +297,7 @@ func (c *Controller) syncHandler(key string) error { return err } - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - clusterBusCopy := clusterBus.DeepCopy() - - // Sync Service derived from the ClusterBus - err = c.syncDispatcherServiceClusterBusStatus(clusterBusCopy) - if err != nil { - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) - return err - } - - // Sync Deployment derived from the ClusterBus - err = c.syncDispatcherDeploymentClusterBusStatus(clusterBusCopy) - if err != nil { - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) - return err - } - - // Sync Deployment derived from the ClusterBus - err = c.syncProvisionerDeploymentClusterBusStatus(clusterBusCopy) - if err != nil { - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) - return err - } - - // Finally, we update the status block of the ClusterBus resource to reflect the - // current state of the world - err = c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + err = c.updateBusStatus(clusterBus) if err != nil { return err } @@ -334,61 +306,7 @@ func (c *Controller) syncHandler(key string) error { return nil } -func (c *Controller) syncDispatcherServiceClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { - - dispatcherService, err := c.syncDispatcherService(clusterBus) - - if err != nil { - clusterBus.Status.Service = nil - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) - util.SetBusCondition(&clusterBus.Status, *serviceCondition) - return err - } - - clusterBus.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&clusterBus.Status, *serviceCondition) - - return nil -} - -func (c *Controller) syncDispatcherDeploymentClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { - - _, err := c.syncDispatcherDeployment(clusterBus) - - if err != nil { - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&clusterBus.Status, *dispatchCondition) - return err - } - - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&clusterBus.Status, *dispatchCondition) - - return nil -} - -func (c *Controller) syncProvisionerDeploymentClusterBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { - - provisionerDeployment, err := c.syncProvisionerDeployment(clusterBus) - - if err != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&clusterBus.Status, *provisionCondition) - return err - } - - if provisionerDeployment != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&clusterBus.Status, *provisionCondition) - } else { - util.RemoveBusCondition(&clusterBus.Status, channelsv1alpha1.BusProvisioning) - } - - return nil -} - -func (c *Controller) syncDispatcherService(clusterBus *channelsv1alpha1.ClusterBus) (*corev1.Service, error) { +func (c *Controller) syncClusterBusDispatcherService(clusterBus *channelsv1alpha1.ClusterBus) (*corev1.Service, error) { // Get the service with the specified service name serviceName := controller.ClusterBusDispatcherServiceName(clusterBus.ObjectMeta.Name) service, err := c.servicesLister.Services(system.Namespace).Get(serviceName) @@ -415,7 +333,7 @@ func (c *Controller) syncDispatcherService(clusterBus *channelsv1alpha1.ClusterB return service, nil } -func (c *Controller) syncDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { +func (c *Controller) syncClusterBusDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { // Get the deployment with the specified deployment name deploymentName := controller.ClusterBusDispatcherDeploymentName(clusterBus.ObjectMeta.Name) deployment, err := c.deploymentsLister.Deployments(system.Namespace).Get(deploymentName) @@ -454,7 +372,7 @@ func (c *Controller) syncDispatcherDeployment(clusterBus *channelsv1alpha1.Clust return deployment, nil } -func (c *Controller) syncProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { +func (c *Controller) syncClusterBusProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) (*appsv1.Deployment, error) { provisioner := clusterBus.Spec.Provisioner // Get the deployment with the specified deployment name @@ -522,6 +440,62 @@ func (c *Controller) compareAndUpdateBusStatus(clusterBus *channelsv1alpha1.Clus return nil } +func (c *Controller) updateBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + clusterBusCopy := clusterBus.DeepCopy() + + // Sync Service derived from the ClusterBus + dispatcherService, err := c.syncClusterBusDispatcherService(clusterBus) + + if err != nil { + clusterBus.Status.Service = nil + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) + util.SetBusCondition(&clusterBus.Status, *serviceCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + clusterBus.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") + util.SetBusCondition(&clusterBus.Status, *serviceCondition) + + // Sync Deployment derived from the ClusterBus + _, err = c.syncClusterBusDispatcherDeployment(clusterBus) + + if err != nil { + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&clusterBus.Status, *dispatchCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&clusterBus.Status, *dispatchCondition) + + // Sync Deployment derived from the ClusterBus + provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBus) + + if err != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&clusterBus.Status, *provisionCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + if provisionerDeployment != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&clusterBus.Status, *provisionCondition) + } else { + util.RemoveBusCondition(&clusterBus.Status, channelsv1alpha1.BusProvisioning) + } + + // Finally, we update the status block of the ClusterBus resource to reflect the + // current state of the world + return c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) +} + // enqueueClusterBus takes a ClusterBus resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than ClusterBus. From 75ad097b68683d46c7b8753fcdfeade1f67bd760 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Thu, 6 Sep 2018 12:14:13 -0700 Subject: [PATCH 5/7] Restore an unnecessary function name change --- pkg/controller/bus/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index 3d13923eaea..f6bd53bf80b 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -319,7 +319,7 @@ func (c *Controller) syncHandler(key string) error { return nil } -func (c *Controller) syncDispatcherService(bus *channelsv1alpha1.Bus) (*corev1.Service, error) { +func (c *Controller) syncBusDispatcherService(bus *channelsv1alpha1.Bus) (*corev1.Service, error) { // Get the service with the specified service name serviceName := controller.BusDispatcherServiceName(bus.Name, bus.Namespace) service, err := c.servicesLister.Services(system.Namespace).Get(serviceName) @@ -461,7 +461,7 @@ func (c *Controller) updateBusStatus(bus *channelsv1alpha1.Bus) error { busCopy := bus.DeepCopy() // Sync Service derived from the Bus - dispatcherService, err := c.syncDispatcherService(bus) + dispatcherService, err := c.syncBusDispatcherService(bus) if err != nil { bus.Status.Service = nil From 7201196c134cd9bc68e2ed907b2906213f620c84 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Thu, 6 Sep 2018 12:30:52 -0700 Subject: [PATCH 6/7] Update Bus's Copy instead of the actual Bus object --- pkg/controller/bus/controller.go | 24 ++++++++++++------------ pkg/controller/clusterbus/controller.go | 24 ++++++++++++------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index f6bd53bf80b..c4f04af075a 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -461,48 +461,48 @@ func (c *Controller) updateBusStatus(bus *channelsv1alpha1.Bus) error { busCopy := bus.DeepCopy() // Sync Service derived from the Bus - dispatcherService, err := c.syncBusDispatcherService(bus) + dispatcherService, err := c.syncBusDispatcherService(busCopy) if err != nil { - bus.Status.Service = nil + busCopy.Status.Service = nil serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) - util.SetBusCondition(&bus.Status, *serviceCondition) + util.SetBusCondition(&busCopy.Status, *serviceCondition) c.compareAndUpdateBusStatus(bus, busCopy) return err } - bus.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&bus.Status, *serviceCondition) + util.SetBusCondition(&busCopy.Status, *serviceCondition) // Sync Deployment derived from the Bus - _, err = c.syncBusDispatcherDeployment(bus) + _, err = c.syncBusDispatcherDeployment(busCopy) if err != nil { dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&bus.Status, *dispatchCondition) + util.SetBusCondition(&busCopy.Status, *dispatchCondition) c.compareAndUpdateBusStatus(bus, busCopy) return err } dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&bus.Status, *dispatchCondition) + util.SetBusCondition(&busCopy.Status, *dispatchCondition) // Sync Deployment derived from the Bus - provisionerDeployment, err := c.syncBusProvisionerDeployment(bus) + provisionerDeployment, err := c.syncBusProvisionerDeployment(busCopy) if err != nil { provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&bus.Status, *provisionCondition) + util.SetBusCondition(&busCopy.Status, *provisionCondition) c.compareAndUpdateBusStatus(bus, busCopy) return err } if provisionerDeployment != nil { provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&bus.Status, *provisionCondition) + util.SetBusCondition(&busCopy.Status, *provisionCondition) } else { - util.RemoveBusCondition(&bus.Status, channelsv1alpha1.BusProvisioning) + util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning) } // Finally, we update the status block of the Bus resource to reflect the diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index 0867f312596..733e34573f1 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -447,48 +447,48 @@ func (c *Controller) updateBusStatus(clusterBus *channelsv1alpha1.ClusterBus) er clusterBusCopy := clusterBus.DeepCopy() // Sync Service derived from the ClusterBus - dispatcherService, err := c.syncClusterBusDispatcherService(clusterBus) + dispatcherService, err := c.syncClusterBusDispatcherService(clusterBusCopy) if err != nil { - clusterBus.Status.Service = nil + clusterBusCopy.Status.Service = nil serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) - util.SetBusCondition(&clusterBus.Status, *serviceCondition) + util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } - clusterBus.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + clusterBusCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&clusterBus.Status, *serviceCondition) + util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) // Sync Deployment derived from the ClusterBus - _, err = c.syncClusterBusDispatcherDeployment(clusterBus) + _, err = c.syncClusterBusDispatcherDeployment(clusterBusCopy) if err != nil { dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&clusterBus.Status, *dispatchCondition) + util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&clusterBus.Status, *dispatchCondition) + util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) // Sync Deployment derived from the ClusterBus - provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBus) + provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBusCopy) if err != nil { provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&clusterBus.Status, *provisionCondition) + util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } if provisionerDeployment != nil { provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&clusterBus.Status, *provisionCondition) + util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) } else { - util.RemoveBusCondition(&clusterBus.Status, channelsv1alpha1.BusProvisioning) + util.RemoveBusCondition(&clusterBusCopy.Status, channelsv1alpha1.BusProvisioning) } // Finally, we update the status block of the ClusterBus resource to reflect the From 4743fcc33d35c63a281cacab986bffc0c2435630 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Thu, 6 Sep 2018 13:29:03 -0700 Subject: [PATCH 7/7] Unify login into syncHandler --- pkg/controller/bus/controller.go | 110 ++++++++++++------------ pkg/controller/clusterbus/controller.go | 110 ++++++++++++------------ 2 files changed, 106 insertions(+), 114 deletions(-) diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index c4f04af075a..6c7e764cb06 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -309,7 +309,59 @@ func (c *Controller) syncHandler(key string) error { return err } - err = c.updateBusStatus(bus) + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + busCopy := bus.DeepCopy() + + // Sync Service derived from the Bus + dispatcherService, err := c.syncBusDispatcherService(busCopy) + + if err != nil { + busCopy.Status.Service = nil + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) + util.SetBusCondition(&busCopy.Status, *serviceCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") + util.SetBusCondition(&busCopy.Status, *serviceCondition) + + // Sync Deployment derived from the Bus + _, err = c.syncBusDispatcherDeployment(busCopy) + + if err != nil { + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&busCopy.Status, *dispatchCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&busCopy.Status, *dispatchCondition) + + // Sync Deployment derived from the Bus + provisionerDeployment, err := c.syncBusProvisionerDeployment(busCopy) + + if err != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&busCopy.Status, *provisionCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + if provisionerDeployment != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&busCopy.Status, *provisionCondition) + } else { + util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning) + } + + // Finally, we update the status block of the Bus resource to reflect the + // current state of the world + err = c.compareAndUpdateBusStatus(bus, busCopy) if err != nil { return err @@ -454,62 +506,6 @@ func (c *Controller) compareAndUpdateBusStatus(bus *channelsv1alpha1.Bus, busCop return nil } -func (c *Controller) updateBusStatus(bus *channelsv1alpha1.Bus) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - busCopy := bus.DeepCopy() - - // Sync Service derived from the Bus - dispatcherService, err := c.syncBusDispatcherService(busCopy) - - if err != nil { - busCopy.Status.Service = nil - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) - util.SetBusCondition(&busCopy.Status, *serviceCondition) - c.compareAndUpdateBusStatus(bus, busCopy) - return err - } - - busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&busCopy.Status, *serviceCondition) - - // Sync Deployment derived from the Bus - _, err = c.syncBusDispatcherDeployment(busCopy) - - if err != nil { - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&busCopy.Status, *dispatchCondition) - c.compareAndUpdateBusStatus(bus, busCopy) - return err - } - - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&busCopy.Status, *dispatchCondition) - - // Sync Deployment derived from the Bus - provisionerDeployment, err := c.syncBusProvisionerDeployment(busCopy) - - if err != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&busCopy.Status, *provisionCondition) - c.compareAndUpdateBusStatus(bus, busCopy) - return err - } - - if provisionerDeployment != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&busCopy.Status, *provisionCondition) - } else { - util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning) - } - - // Finally, we update the status block of the Bus resource to reflect the - // current state of the world - return c.compareAndUpdateBusStatus(bus, busCopy) -} - // enqueueBus takes a Bus resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than Bus. diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index 733e34573f1..0d977e97635 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -297,7 +297,59 @@ func (c *Controller) syncHandler(key string) error { return err } - err = c.updateBusStatus(clusterBus) + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + clusterBusCopy := clusterBus.DeepCopy() + + // Sync Service derived from the ClusterBus + dispatcherService, err := c.syncClusterBusDispatcherService(clusterBusCopy) + + if err != nil { + clusterBusCopy.Status.Service = nil + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + clusterBusCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) + + // Sync Deployment derived from the ClusterBus + _, err = c.syncClusterBusDispatcherDeployment(clusterBusCopy) + + if err != nil { + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) + + // Sync Deployment derived from the ClusterBus + provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBusCopy) + + if err != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) + return err + } + + if provisionerDeployment != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) + } else { + util.RemoveBusCondition(&clusterBusCopy.Status, channelsv1alpha1.BusProvisioning) + } + + // Finally, we update the status block of the ClusterBus resource to reflect the + // current state of the world + err = c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) if err != nil { return err } @@ -440,62 +492,6 @@ func (c *Controller) compareAndUpdateBusStatus(clusterBus *channelsv1alpha1.Clus return nil } -func (c *Controller) updateBusStatus(clusterBus *channelsv1alpha1.ClusterBus) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - clusterBusCopy := clusterBus.DeepCopy() - - // Sync Service derived from the ClusterBus - dispatcherService, err := c.syncClusterBusDispatcherService(clusterBusCopy) - - if err != nil { - clusterBusCopy.Status.Service = nil - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) - util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) - return err - } - - clusterBusCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) - - // Sync Deployment derived from the ClusterBus - _, err = c.syncClusterBusDispatcherDeployment(clusterBusCopy) - - if err != nil { - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) - return err - } - - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) - - // Sync Deployment derived from the ClusterBus - provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBusCopy) - - if err != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) - util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) - c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) - return err - } - - if provisionerDeployment != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) - } else { - util.RemoveBusCondition(&clusterBusCopy.Status, channelsv1alpha1.BusProvisioning) - } - - // Finally, we update the status block of the ClusterBus resource to reflect the - // current state of the world - return c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) -} - // enqueueClusterBus takes a ClusterBus resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than ClusterBus.