diff --git a/pkg/apis/channels/v1alpha1/bus_types.go b/pkg/apis/channels/v1alpha1/bus_types.go index 821310f4124..85e80600588 100644 --- a/pkg/apis/channels/v1alpha1/bus_types.go +++ b/pkg/apis/channels/v1alpha1/bus_types.go @@ -142,6 +142,10 @@ func (b *Bus) GetSpec() *BusSpec { return &b.Spec } +func (b *Bus) GetStatus() *BusStatus { + return &b.Status +} + func (b *Bus) GetSpecJSON() ([]byte, error) { return json.Marshal(b.Spec) } @@ -161,6 +165,7 @@ type GenericBus interface { meta_v1.ObjectMetaAccessor BacksChannel(channel *Channel) bool GetSpec() *BusSpec + GetStatus() *BusStatus // Needed for generic webhook support apis.Defaultable diff --git a/pkg/apis/channels/v1alpha1/clusterbus_types.go b/pkg/apis/channels/v1alpha1/clusterbus_types.go index a9a6cbdf9f9..4d891400983 100644 --- a/pkg/apis/channels/v1alpha1/clusterbus_types.go +++ b/pkg/apis/channels/v1alpha1/clusterbus_types.go @@ -50,17 +50,20 @@ var _ webhook.GenericCRD = (*ClusterBus)(nil) type ClusterBusSpec = BusSpec // ClusterBusStatus (computed) for a clusterbus -type ClusterBusStatus struct { -} +type ClusterBusStatus = BusStatus func (b *ClusterBus) BacksChannel(channel *Channel) bool { return len(b.Namespace) == 0 && b.Name == channel.Spec.ClusterBus } -func (b *ClusterBus) GetSpec() *BusSpec { +func (b *ClusterBus) GetSpec() *ClusterBusSpec { return &b.Spec } +func (b *ClusterBus) GetStatus() *ClusterBusStatus { + return &b.Status +} + func (b *ClusterBus) GetSpecJSON() ([]byte, error) { return json.Marshal(b.Spec) } diff --git a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go index e876226a49a..3f815bbcd0a 100644 --- a/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go @@ -403,7 +403,7 @@ func (in *ClusterBus) DeepCopyInto(out *ClusterBus) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } @@ -458,22 +458,6 @@ func (in *ClusterBusList) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ClusterBusStatus) DeepCopyInto(out *ClusterBusStatus) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterBusStatus. -func (in *ClusterBusStatus) DeepCopy() *ClusterBusStatus { - if in == nil { - return nil - } - out := new(ClusterBusStatus) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Parameter) DeepCopyInto(out *Parameter) { *out = *in diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index 88cae16d3b4..6c7e764cb06 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -309,46 +309,60 @@ func (c *Controller) syncHandler(key string) error { return err } - var dispatcherService *corev1.Service - var dispatcherDeployment, provisionerDeployment *appsv1.Deployment - var dispatcherServiceErr, dispatcherDeplErr, provisionerDeplError error + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + busCopy := bus.DeepCopy() // Sync Service derived from the Bus - dispatcherService, dispatcherServiceErr = c.syncBusDispatcherService(bus) - if dispatcherServiceErr != nil { - _ = c.updateBusStatus(bus, - dispatcherService, dispatcherServiceErr, - dispatcherDeployment, dispatcherDeplErr, - provisionerDeployment, provisionerDeplError) - return dispatcherServiceErr + dispatcherService, err := c.syncBusDispatcherService(busCopy) + + if err != nil { + busCopy.Status.Service = nil + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) + util.SetBusCondition(&busCopy.Status, *serviceCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err } + busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") + util.SetBusCondition(&busCopy.Status, *serviceCondition) + // Sync Deployment derived from the Bus - dispatcherDeployment, dispatcherDeplErr = c.syncBusDispatcherDeployment(bus) - if dispatcherDeplErr != nil { - _ = c.updateBusStatus(bus, - dispatcherService, dispatcherServiceErr, - dispatcherDeployment, dispatcherDeplErr, - provisionerDeployment, provisionerDeplError) - return dispatcherDeplErr + _, err = c.syncBusDispatcherDeployment(busCopy) + + if err != nil { + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&busCopy.Status, *dispatchCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err } + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&busCopy.Status, *dispatchCondition) + // Sync Deployment derived from the Bus - provisionerDeployment, provisionerDeplError = c.syncBusProvisionerDeployment(bus) - if provisionerDeplError != nil { - _ = c.updateBusStatus(bus, - dispatcherService, dispatcherServiceErr, - dispatcherDeployment, dispatcherDeplErr, - provisionerDeployment, provisionerDeplError) - return provisionerDeplError + provisionerDeployment, err := c.syncBusProvisionerDeployment(busCopy) + + if err != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&busCopy.Status, *provisionCondition) + c.compareAndUpdateBusStatus(bus, busCopy) + return err + } + + if provisionerDeployment != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&busCopy.Status, *provisionCondition) + } else { + util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning) } // Finally, we update the status block of the Bus resource to reflect the // current state of the world - err = c.updateBusStatus(bus, - dispatcherService, dispatcherServiceErr, - dispatcherDeployment, dispatcherDeplErr, - provisionerDeployment, provisionerDeplError) + err = c.compareAndUpdateBusStatus(bus, busCopy) + if err != nil { return err } @@ -477,45 +491,7 @@ func (c *Controller) syncBusProvisionerDeployment(bus *channelsv1alpha1.Bus) (*a return deployment, nil } -func (c *Controller) updateBusStatus( - bus *channelsv1alpha1.Bus, - dispatcherService *corev1.Service, dispatcherServiceErr error, - dispatcherDeployment *appsv1.Deployment, dispatcherDeploymentErr error, - provisionerDeployment *appsv1.Deployment, provisionerDeploymentErr error, -) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - busCopy := bus.DeepCopy() - - if dispatcherService != nil { - busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") - util.SetBusCondition(&busCopy.Status, *serviceCondition) - } else { - busCopy.Status.Service = nil - serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, dispatcherServiceErr.Error()) - util.SetBusCondition(&busCopy.Status, *serviceCondition) - } - - if dispatcherDeployment != nil { - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&busCopy.Status, *dispatchCondition) - } else { - dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, dispatcherDeploymentErr.Error()) - util.SetBusCondition(&busCopy.Status, *dispatchCondition) - } - - if provisionerDeployment != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") - util.SetBusCondition(&busCopy.Status, *provisionCondition) - } else if provisionerDeploymentErr != nil { - provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, provisionerDeploymentErr.Error()) - util.SetBusCondition(&busCopy.Status, *provisionCondition) - } else { - util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning) - } - +func (c *Controller) compareAndUpdateBusStatus(bus *channelsv1alpha1.Bus, busCopy *channelsv1alpha1.Bus) error { util.ConsolidateBusCondition(busCopy) // Only update if status has changed diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index 8d0fe2cba34..0d977e97635 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -48,6 +48,7 @@ import ( channelscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/pkg/client/informers/externalversions" listers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + "github.com/knative/eventing/pkg/controller/util" "github.com/knative/eventing/pkg/system" sharedclientset "github.com/knative/pkg/client/clientset/versioned" sharedinformers "github.com/knative/pkg/client/informers/externalversions" @@ -73,6 +74,17 @@ const ( MessageResourceSynced = "ClusterBus synced successfully" ) +const ( + // ServiceSynced is used as part of the condition reason when the bus (k8s) service is successfully created. + ServiceSynced = "ServiceSynced" + // ServiceError is used as part of the condition reason when the bus (k8s) service creation failed. + ServiceError = "ServiceError" + // DeploymentSynced is used as part of the condition reason when a bus deployment is successfully created. + DeploymentSynced = "DeploymentSynced" + // DeploymentError is used as part of the condition reason when a bus deployment creation failed. + DeploymentError = "DeploymentError" +) + // Controller is the controller implementation for ClusterBus resources type Controller struct { // kubeclientset is a standard kubernetes clientset @@ -285,27 +297,59 @@ func (c *Controller) syncHandler(key string) error { return err } + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + clusterBusCopy := clusterBus.DeepCopy() + // Sync Service derived from the ClusterBus - dispatcherService, err := c.syncClusterBusDispatcherService(clusterBus) + dispatcherService, err := c.syncClusterBusDispatcherService(clusterBusCopy) + if err != nil { + clusterBusCopy.Status.Service = nil + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } + clusterBusCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name} + serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition) + // Sync Deployment derived from the ClusterBus - dispatcherDeployment, err := c.syncClusterBusDispatcherDeployment(clusterBus) + _, err = c.syncClusterBusDispatcherDeployment(clusterBusCopy) + if err != nil { + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } + dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition) + // Sync Deployment derived from the ClusterBus - provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBus) + provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBusCopy) + if err != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error()) + util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) + c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) return err } + if provisionerDeployment != nil { + provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced") + util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition) + } else { + util.RemoveBusCondition(&clusterBusCopy.Status, channelsv1alpha1.BusProvisioning) + } + // Finally, we update the status block of the ClusterBus resource to reflect the // current state of the world - err = c.updateClusterBusStatus(clusterBus, dispatcherService, dispatcherDeployment, provisionerDeployment) + err = c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy) if err != nil { return err } @@ -434,16 +478,8 @@ func (c *Controller) syncClusterBusProvisionerDeployment(clusterBus *channelsv1a return deployment, nil } -func (c *Controller) updateClusterBusStatus( - clusterBus *channelsv1alpha1.ClusterBus, - dispatcherService *corev1.Service, - dispatcherDeployment *appsv1.Deployment, - provisionerDeployment *appsv1.Deployment, -) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - clusterBusCopy := clusterBus.DeepCopy() +func (c *Controller) compareAndUpdateBusStatus(clusterBus *channelsv1alpha1.ClusterBus, clusterBusCopy *channelsv1alpha1.ClusterBus) error { + util.ConsolidateBusCondition(clusterBusCopy) // Only update if status has changed if !equality.Semantic.DeepEqual(clusterBus.Status, clusterBusCopy.Status) { // If the CustomResourceSubresources feature gate is not enabled, diff --git a/pkg/controller/util/bus_util.go b/pkg/controller/util/bus_util.go index 39b9272a713..0b6f72b9dbd 100644 --- a/pkg/controller/util/bus_util.go +++ b/pkg/controller/util/bus_util.go @@ -68,11 +68,11 @@ func RemoveBusCondition(status *v1alpha1.BusStatus, condType v1alpha1.BusConditi // ConsolidateBusCondition computes and sets the overall "Ready" condition of the bus // given all other sub-conditions. -func ConsolidateBusCondition(bus *v1alpha1.Bus) { - dispatching := GetBusCondition(bus.Status, v1alpha1.BusDispatching) - provisioning := GetBusCondition(bus.Status, v1alpha1.BusProvisioning) - serviceable := GetBusCondition(bus.Status, v1alpha1.BusServiceable) - needsProvitioner := bus.Spec.Provisioner != nil +func ConsolidateBusCondition(bus v1alpha1.GenericBus) { + dispatching := GetBusCondition(*bus.GetStatus(), v1alpha1.BusDispatching) + provisioning := GetBusCondition(*bus.GetStatus(), v1alpha1.BusProvisioning) + serviceable := GetBusCondition(*bus.GetStatus(), v1alpha1.BusServiceable) + needsProvitioner := bus.GetSpec().Provisioner != nil var cond *v1alpha1.BusCondition @@ -83,7 +83,7 @@ func ConsolidateBusCondition(bus *v1alpha1.Bus) { } else { cond = NewBusCondition(v1alpha1.BusReady, v1.ConditionFalse, "", "") } - SetBusCondition(&bus.Status, *cond) + SetBusCondition(bus.GetStatus(), *cond) } // IsBusReady returns whether all readiness conditions of a bus are met, as a boolean.