Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/apis/channels/v1alpha1/bus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (b *Bus) GetSpec() *BusSpec {
return &b.Spec
}

func (b *Bus) GetStatus() *BusStatus {
return &b.Status
}

func (b *Bus) GetSpecJSON() ([]byte, error) {
return json.Marshal(b.Spec)
}
Expand All @@ -161,6 +165,7 @@ type GenericBus interface {
meta_v1.ObjectMetaAccessor
BacksChannel(channel *Channel) bool
GetSpec() *BusSpec
GetStatus() *BusStatus

// Needed for generic webhook support
apis.Defaultable
Expand Down
9 changes: 6 additions & 3 deletions pkg/apis/channels/v1alpha1/clusterbus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,20 @@ var _ webhook.GenericCRD = (*ClusterBus)(nil)
type ClusterBusSpec = BusSpec

// ClusterBusStatus (computed) for a clusterbus
type ClusterBusStatus struct {
}
type ClusterBusStatus = BusStatus
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bus and ClusterBus may not always have the same status, but the they do at the moment and I don't foresee that changing in the short term.

Copy link
Copy Markdown
Contributor

@greghaynes greghaynes Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have this pattern already with ClusterBusSpec but I think we'd want a new type rather than an alias here (e.g. type ClusterBusStatus BusStatus). I'm curious if there's a reason for the alias?

No need to block this PR on that, can fix it later on, but interested to know :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not using an alias is a good idea as it will be easier to fork the two types in the future should a distinction arise. Right now someone could do something silly like:

bus := Bus{
    Spec: ClusterBusSpec{},
}

I also agree it doesn't need to block this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also done to re-use some of the bus_util functions that deals with Status for ClusterBus. I agree ClusterBusStatus can be its own type when it diverges from BusStatus.


func (b *ClusterBus) BacksChannel(channel *Channel) bool {
return len(b.Namespace) == 0 && b.Name == channel.Spec.ClusterBus
}

func (b *ClusterBus) GetSpec() *BusSpec {
func (b *ClusterBus) GetSpec() *ClusterBusSpec {
return &b.Spec
}

func (b *ClusterBus) GetStatus() *ClusterBusStatus {
return &b.Status
}

func (b *ClusterBus) GetSpecJSON() ([]byte, error) {
return json.Marshal(b.Spec)
}
Expand Down
18 changes: 1 addition & 17 deletions pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 43 additions & 67 deletions pkg/controller/bus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,46 +309,60 @@ func (c *Controller) syncHandler(key string) error {
return err
}

var dispatcherService *corev1.Service
var dispatcherDeployment, provisionerDeployment *appsv1.Deployment
var dispatcherServiceErr, dispatcherDeplErr, provisionerDeplError error
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
busCopy := bus.DeepCopy()

// Sync Service derived from the Bus
dispatcherService, dispatcherServiceErr = c.syncBusDispatcherService(bus)
if dispatcherServiceErr != nil {
_ = c.updateBusStatus(bus,
dispatcherService, dispatcherServiceErr,
dispatcherDeployment, dispatcherDeplErr,
provisionerDeployment, provisionerDeplError)
return dispatcherServiceErr
dispatcherService, err := c.syncBusDispatcherService(busCopy)

if err != nil {
busCopy.Status.Service = nil
serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error())
util.SetBusCondition(&busCopy.Status, *serviceCondition)
c.compareAndUpdateBusStatus(bus, busCopy)
return err
}

busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name}
serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced")
util.SetBusCondition(&busCopy.Status, *serviceCondition)

// Sync Deployment derived from the Bus
dispatcherDeployment, dispatcherDeplErr = c.syncBusDispatcherDeployment(bus)
if dispatcherDeplErr != nil {
_ = c.updateBusStatus(bus,
dispatcherService, dispatcherServiceErr,
dispatcherDeployment, dispatcherDeplErr,
provisionerDeployment, provisionerDeplError)
return dispatcherDeplErr
_, err = c.syncBusDispatcherDeployment(busCopy)

if err != nil {
dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error())
util.SetBusCondition(&busCopy.Status, *dispatchCondition)
c.compareAndUpdateBusStatus(bus, busCopy)
return err
}

dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced")
util.SetBusCondition(&busCopy.Status, *dispatchCondition)

// Sync Deployment derived from the Bus
provisionerDeployment, provisionerDeplError = c.syncBusProvisionerDeployment(bus)
if provisionerDeplError != nil {
_ = c.updateBusStatus(bus,
dispatcherService, dispatcherServiceErr,
dispatcherDeployment, dispatcherDeplErr,
provisionerDeployment, provisionerDeplError)
return provisionerDeplError
provisionerDeployment, err := c.syncBusProvisionerDeployment(busCopy)

if err != nil {
provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error())
util.SetBusCondition(&busCopy.Status, *provisionCondition)
c.compareAndUpdateBusStatus(bus, busCopy)
return err
}

if provisionerDeployment != nil {
provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced")
util.SetBusCondition(&busCopy.Status, *provisionCondition)
} else {
util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning)
}

// Finally, we update the status block of the Bus resource to reflect the
// current state of the world
err = c.updateBusStatus(bus,
dispatcherService, dispatcherServiceErr,
dispatcherDeployment, dispatcherDeplErr,
provisionerDeployment, provisionerDeplError)
err = c.compareAndUpdateBusStatus(bus, busCopy)

if err != nil {
return err
}
Expand Down Expand Up @@ -477,45 +491,7 @@ func (c *Controller) syncBusProvisionerDeployment(bus *channelsv1alpha1.Bus) (*a
return deployment, nil
}

func (c *Controller) updateBusStatus(
bus *channelsv1alpha1.Bus,
dispatcherService *corev1.Service, dispatcherServiceErr error,
dispatcherDeployment *appsv1.Deployment, dispatcherDeploymentErr error,
provisionerDeployment *appsv1.Deployment, provisionerDeploymentErr error,
) error {
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
busCopy := bus.DeepCopy()

if dispatcherService != nil {
busCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name}
serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced")
util.SetBusCondition(&busCopy.Status, *serviceCondition)
} else {
busCopy.Status.Service = nil
serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, dispatcherServiceErr.Error())
util.SetBusCondition(&busCopy.Status, *serviceCondition)
}

if dispatcherDeployment != nil {
dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced")
util.SetBusCondition(&busCopy.Status, *dispatchCondition)
} else {
dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, dispatcherDeploymentErr.Error())
util.SetBusCondition(&busCopy.Status, *dispatchCondition)
}

if provisionerDeployment != nil {
provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced")
util.SetBusCondition(&busCopy.Status, *provisionCondition)
} else if provisionerDeploymentErr != nil {
provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, provisionerDeploymentErr.Error())
util.SetBusCondition(&busCopy.Status, *provisionCondition)
} else {
util.RemoveBusCondition(&busCopy.Status, channelsv1alpha1.BusProvisioning)
}

func (c *Controller) compareAndUpdateBusStatus(bus *channelsv1alpha1.Bus, busCopy *channelsv1alpha1.Bus) error {
util.ConsolidateBusCondition(busCopy)

// Only update if status has changed
Expand Down
64 changes: 50 additions & 14 deletions pkg/controller/clusterbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
channelscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme"
informers "github.com/knative/eventing/pkg/client/informers/externalversions"
listers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1"
"github.com/knative/eventing/pkg/controller/util"
"github.com/knative/eventing/pkg/system"
sharedclientset "github.com/knative/pkg/client/clientset/versioned"
sharedinformers "github.com/knative/pkg/client/informers/externalversions"
Expand All @@ -73,6 +74,17 @@ const (
MessageResourceSynced = "ClusterBus synced successfully"
)

const (
// ServiceSynced is used as part of the condition reason when the bus (k8s) service is successfully created.
ServiceSynced = "ServiceSynced"
// ServiceError is used as part of the condition reason when the bus (k8s) service creation failed.
ServiceError = "ServiceError"
// DeploymentSynced is used as part of the condition reason when a bus deployment is successfully created.
DeploymentSynced = "DeploymentSynced"
// DeploymentError is used as part of the condition reason when a bus deployment creation failed.
DeploymentError = "DeploymentError"
)

// Controller is the controller implementation for ClusterBus resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
Expand Down Expand Up @@ -285,27 +297,59 @@ func (c *Controller) syncHandler(key string) error {
return err
}

// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
clusterBusCopy := clusterBus.DeepCopy()

// Sync Service derived from the ClusterBus
dispatcherService, err := c.syncClusterBusDispatcherService(clusterBus)
dispatcherService, err := c.syncClusterBusDispatcherService(clusterBusCopy)

if err != nil {
clusterBusCopy.Status.Service = nil
serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionFalse, ServiceError, err.Error())
util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition)
c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy)
return err
}

clusterBusCopy.Status.Service = &corev1.LocalObjectReference{Name: dispatcherService.Name}
serviceCondition := util.NewBusCondition(channelsv1alpha1.BusServiceable, corev1.ConditionTrue, ServiceSynced, "service successfully synced")
util.SetBusCondition(&clusterBusCopy.Status, *serviceCondition)

// Sync Deployment derived from the ClusterBus
dispatcherDeployment, err := c.syncClusterBusDispatcherDeployment(clusterBus)
_, err = c.syncClusterBusDispatcherDeployment(clusterBusCopy)

if err != nil {
dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionFalse, DeploymentError, err.Error())
util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition)
c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy)
return err
}

dispatchCondition := util.NewBusCondition(channelsv1alpha1.BusDispatching, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced")
util.SetBusCondition(&clusterBusCopy.Status, *dispatchCondition)

// Sync Deployment derived from the ClusterBus
provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBus)
provisionerDeployment, err := c.syncClusterBusProvisionerDeployment(clusterBusCopy)

if err != nil {
provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionFalse, DeploymentError, err.Error())
util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition)
c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy)
return err
}

if provisionerDeployment != nil {
provisionCondition := util.NewBusCondition(channelsv1alpha1.BusProvisioning, corev1.ConditionTrue, DeploymentSynced, "deployment successfully synced")
util.SetBusCondition(&clusterBusCopy.Status, *provisionCondition)
} else {
util.RemoveBusCondition(&clusterBusCopy.Status, channelsv1alpha1.BusProvisioning)
}

// Finally, we update the status block of the ClusterBus resource to reflect the
// current state of the world
err = c.updateClusterBusStatus(clusterBus, dispatcherService, dispatcherDeployment, provisionerDeployment)
err = c.compareAndUpdateBusStatus(clusterBus, clusterBusCopy)
if err != nil {
return err
}
Expand Down Expand Up @@ -434,16 +478,8 @@ func (c *Controller) syncClusterBusProvisionerDeployment(clusterBus *channelsv1a
return deployment, nil
}

func (c *Controller) updateClusterBusStatus(
clusterBus *channelsv1alpha1.ClusterBus,
dispatcherService *corev1.Service,
dispatcherDeployment *appsv1.Deployment,
provisionerDeployment *appsv1.Deployment,
) error {
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
clusterBusCopy := clusterBus.DeepCopy()
func (c *Controller) compareAndUpdateBusStatus(clusterBus *channelsv1alpha1.ClusterBus, clusterBusCopy *channelsv1alpha1.ClusterBus) error {
util.ConsolidateBusCondition(clusterBusCopy)
// Only update if status has changed
if !equality.Semantic.DeepEqual(clusterBus.Status, clusterBusCopy.Status) {
// If the CustomResourceSubresources feature gate is not enabled,
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/util/bus_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func RemoveBusCondition(status *v1alpha1.BusStatus, condType v1alpha1.BusConditi

// ConsolidateBusCondition computes and sets the overall "Ready" condition of the bus
// given all other sub-conditions.
func ConsolidateBusCondition(bus *v1alpha1.Bus) {
dispatching := GetBusCondition(bus.Status, v1alpha1.BusDispatching)
provisioning := GetBusCondition(bus.Status, v1alpha1.BusProvisioning)
serviceable := GetBusCondition(bus.Status, v1alpha1.BusServiceable)
needsProvitioner := bus.Spec.Provisioner != nil
func ConsolidateBusCondition(bus v1alpha1.GenericBus) {
dispatching := GetBusCondition(*bus.GetStatus(), v1alpha1.BusDispatching)
provisioning := GetBusCondition(*bus.GetStatus(), v1alpha1.BusProvisioning)
serviceable := GetBusCondition(*bus.GetStatus(), v1alpha1.BusServiceable)
needsProvitioner := bus.GetSpec().Provisioner != nil

var cond *v1alpha1.BusCondition

Expand All @@ -83,7 +83,7 @@ func ConsolidateBusCondition(bus *v1alpha1.Bus) {
} else {
cond = NewBusCondition(v1alpha1.BusReady, v1.ConditionFalse, "", "")
}
SetBusCondition(&bus.Status, *cond)
SetBusCondition(bus.GetStatus(), *cond)
}

// IsBusReady returns whether all readiness conditions of a bus are met, as a boolean.
Expand Down