From 5f5974820bf9aed24abe2ea739812f4a9bd87132 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 8 Jul 2020 21:03:51 +0200 Subject: [PATCH 1/2] Allow registering custom conditions for the Broker Signed-off-by: Pierangelo Di Pilato --- pkg/apis/eventing/v1/broker_lifecycle.go | 61 ++++++++++++---- pkg/apis/eventing/v1/broker_lifecycle_test.go | 69 +++++++++++++++++++ pkg/apis/eventing/v1beta1/broker_lifecycle.go | 61 ++++++++++++---- .../eventing/v1beta1/broker_lifecycle_test.go | 69 +++++++++++++++++++ .../eventing/v1beta1/eventtype_lifecycle.go | 2 +- 5 files changed, 235 insertions(+), 27 deletions(-) diff --git a/pkg/apis/eventing/v1/broker_lifecycle.go b/pkg/apis/eventing/v1/broker_lifecycle.go index ebd71cde536..bbe672c3ff9 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1/broker_lifecycle.go @@ -21,6 +21,8 @@ import ( "knative.dev/eventing/pkg/apis/duck" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/eventing" + "knative.dev/pkg/apis" ) @@ -31,6 +33,18 @@ var brokerCondSet = apis.NewLivingConditionSet( BrokerConditionAddressable, ) +var customConditionSet = map[string]apis.ConditionSet{ + "": brokerCondSet, + eventing.MTChannelBrokerClassValue: brokerCondSet, +} + +// RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class. +// +// Calls to this function need to be synchronized by the caller (not thread-safe). +func RegisterAlternateBrokerConditionSet(brokerClass string, conditionSet apis.ConditionSet) { + customConditionSet[brokerClass] = conditionSet +} + const ( BrokerConditionReady = apis.ConditionReady BrokerConditionIngress apis.ConditionType = "IngressReady" @@ -40,13 +54,34 @@ const ( ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. -func (*Broker) GetConditionSet() apis.ConditionSet { +func (b *Broker) GetConditionSet() apis.ConditionSet { + + annotations := b.GetAnnotations() + if annotations != nil { + if brokerClass, ok := annotations[eventing.BrokerClassKey]; ok && brokerClass != eventing.MTChannelBrokerClassValue { + + // Set broker class as annotation of the status, so that we can use it. + if b.Status.Annotations == nil { + b.Status.Annotations = map[string]string{eventing.BrokerClassKey: brokerClass} + } else { + b.Status.Annotations[eventing.BrokerClassKey] = brokerClass + } + + return customConditionSet[brokerClass] + } + } + return brokerCondSet } +// GetConditionSet retrieves the condition set for this resource. +func (bs *BrokerStatus) GetConditionSet() apis.ConditionSet { + return customConditionSet[bs.Annotations[eventing.BrokerClassKey]] +} + // GetTopLevelCondition returns the top level Condition. func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition { - return brokerCondSet.Manage(bs).GetTopLevelCondition() + return bs.GetConditionSet().Manage(bs).GetTopLevelCondition() } // SetAddress makes this Broker addressable by setting the URI. It also @@ -54,60 +89,60 @@ func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition { func (bs *BrokerStatus) SetAddress(url *apis.URL) { bs.Address.URL = url if url != nil { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionAddressable) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionAddressable) } else { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil") + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil") } } // GetCondition returns the condition currently associated with the given type, or nil. func (bs *BrokerStatus) GetCondition(t apis.ConditionType) *apis.Condition { - return brokerCondSet.Manage(bs).GetCondition(t) + return bs.GetConditionSet().Manage(bs).GetCondition(t) } // IsReady returns true if the resource is ready overall. func (bs *BrokerStatus) IsReady() bool { - return brokerCondSet.Manage(bs).IsHappy() + return bs.GetConditionSet().Manage(bs).IsHappy() } // InitializeConditions sets relevant unset conditions to Unknown state. func (bs *BrokerStatus) InitializeConditions() { - brokerCondSet.Manage(bs).InitializeConditions() + bs.GetConditionSet().Manage(bs).InitializeConditions() } func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) } func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { if duck.EndpointsAreAvailable(ep) { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) } else { bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) } } func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) } func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1.ChannelableStatus) { // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... address := cs.AddressStatus.Address if address != nil { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel) } else { bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") } } func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) } func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { if duck.EndpointsAreAvailable(ep) { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) } else { bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) } diff --git a/pkg/apis/eventing/v1/broker_lifecycle_test.go b/pkg/apis/eventing/v1/broker_lifecycle_test.go index 5c6fd43c112..e487e8007a2 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1/broker_lifecycle_test.go @@ -18,8 +18,10 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/eventing" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -57,6 +59,73 @@ var ( } ) +func TestBrokerGetConditionSet(t *testing.T) { + + customCondition := apis.NewLivingConditionSet( + apis.ConditionReady, + "ConditionGolangReady", + ) + brokerClass := "Golang" + + RegisterAlternateBrokerConditionSet(brokerClass, customCondition) + + tt := []struct { + name string + broker Broker + expectedConditionSet apis.ConditionSet + expectedBrokerStatus duckv1.Status + }{ + { + name: "default condition set", + broker: Broker{}, + expectedConditionSet: brokerCondSet, + expectedBrokerStatus: duckv1.Status{}, + }, + { + name: "custom condition set", + broker: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + eventing.BrokerClassKey: brokerClass, + }, + }, + }, + expectedConditionSet: customCondition, + expectedBrokerStatus: duckv1.Status{ + Annotations: map[string]string{ + eventing.BrokerClassKey: brokerClass, + }, + }, + }, + { + name: "no status updates for " + eventing.MTChannelBrokerClassValue, + broker: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + eventing.BrokerClassKey: eventing.MTChannelBrokerClassValue, + }, + }, + }, + expectedConditionSet: brokerCondSet, + expectedBrokerStatus: duckv1.Status{}, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { + t.Errorf("unexpected conditions (-want, +got) %s", diff) + } + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.Status.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { + t.Errorf("unexpected conditions (-want, +got) %s", diff) + } + if diff := cmp.Diff(tc.expectedBrokerStatus, tc.broker.Status.Status); diff != "" { + t.Errorf("unexpected duck status (-want, +got) %s", diff) + } + }) + } +} + func TestBrokerGetCondition(t *testing.T) { tests := []struct { name string diff --git a/pkg/apis/eventing/v1beta1/broker_lifecycle.go b/pkg/apis/eventing/v1beta1/broker_lifecycle.go index 6095195a125..503c173d7f0 100644 --- a/pkg/apis/eventing/v1beta1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1beta1/broker_lifecycle.go @@ -21,6 +21,8 @@ import ( "knative.dev/eventing/pkg/apis/duck" duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + "knative.dev/eventing/pkg/apis/eventing" + "knative.dev/pkg/apis" ) @@ -31,6 +33,18 @@ var brokerCondSet = apis.NewLivingConditionSet( BrokerConditionAddressable, ) +var customConditionSet = map[string]apis.ConditionSet{ + "": brokerCondSet, + eventing.MTChannelBrokerClassValue: brokerCondSet, +} + +// RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class. +// +// Calls to this function need to be synchronized by the caller (not thread-safe). +func RegisterAlternateBrokerConditionSet(brokerClass string, conditionSet apis.ConditionSet) { + customConditionSet[brokerClass] = conditionSet +} + const ( BrokerConditionReady = apis.ConditionReady BrokerConditionIngress apis.ConditionType = "IngressReady" @@ -40,13 +54,34 @@ const ( ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. -func (*Broker) GetConditionSet() apis.ConditionSet { +func (b *Broker) GetConditionSet() apis.ConditionSet { + + annotations := b.GetAnnotations() + if annotations != nil { + if brokerClass, ok := annotations[eventing.BrokerClassKey]; ok && brokerClass != eventing.MTChannelBrokerClassValue { + + // Set broker class as annotation of the status, so that we can use it. + if b.Status.Annotations == nil { + b.Status.Annotations = map[string]string{eventing.BrokerClassKey: brokerClass} + } else { + b.Status.Annotations[eventing.BrokerClassKey] = brokerClass + } + + return customConditionSet[brokerClass] + } + } + return brokerCondSet } +// GetConditionSet retrieves the condition set for this resource. +func (bs *BrokerStatus) GetConditionSet() apis.ConditionSet { + return customConditionSet[bs.Annotations[eventing.BrokerClassKey]] +} + // GetTopLevelCondition returns the top level Condition. func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition { - return brokerCondSet.Manage(bs).GetTopLevelCondition() + return bs.GetConditionSet().Manage(bs).GetTopLevelCondition() } // SetAddress makes this Broker addressable by setting the URI. It also @@ -54,60 +89,60 @@ func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition { func (bs *BrokerStatus) SetAddress(url *apis.URL) { bs.Address.URL = url if url != nil { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionAddressable) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionAddressable) } else { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil") + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil") } } // GetCondition returns the condition currently associated with the given type, or nil. func (bs *BrokerStatus) GetCondition(t apis.ConditionType) *apis.Condition { - return brokerCondSet.Manage(bs).GetCondition(t) + return bs.GetConditionSet().Manage(bs).GetCondition(t) } // IsReady returns true if the resource is ready overall. func (bs *BrokerStatus) IsReady() bool { - return brokerCondSet.Manage(bs).IsHappy() + return bs.GetConditionSet().Manage(bs).IsHappy() } // InitializeConditions sets relevant unset conditions to Unknown state. func (bs *BrokerStatus) InitializeConditions() { - brokerCondSet.Manage(bs).InitializeConditions() + bs.GetConditionSet().Manage(bs).InitializeConditions() } func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) } func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { if duck.EndpointsAreAvailable(ep) { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) } else { bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) } } func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) } func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1beta1.ChannelableStatus) { // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... address := cs.AddressStatus.Address if address != nil { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel) } else { bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") } } func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) } func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { if duck.EndpointsAreAvailable(ep) { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) } else { bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) } diff --git a/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go b/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go index eb1cfe81e3f..18f7ad3ae6c 100644 --- a/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go @@ -18,8 +18,10 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + "knative.dev/eventing/pkg/apis/eventing" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -57,6 +59,73 @@ var ( } ) +func TestBrokerGetConditionSet(t *testing.T) { + + customCondition := apis.NewLivingConditionSet( + apis.ConditionReady, + "ConditionGolangReady", + ) + brokerClass := "Golang" + + RegisterAlternateBrokerConditionSet(brokerClass, customCondition) + + tt := []struct { + name string + broker Broker + expectedConditionSet apis.ConditionSet + expectedBrokerStatus duckv1.Status + }{ + { + name: "default condition set", + broker: Broker{}, + expectedConditionSet: brokerCondSet, + expectedBrokerStatus: duckv1.Status{}, + }, + { + name: "custom condition set", + broker: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + eventing.BrokerClassKey: brokerClass, + }, + }, + }, + expectedConditionSet: customCondition, + expectedBrokerStatus: duckv1.Status{ + Annotations: map[string]string{ + eventing.BrokerClassKey: brokerClass, + }, + }, + }, + { + name: "no status updates for " + eventing.MTChannelBrokerClassValue, + broker: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + eventing.BrokerClassKey: eventing.MTChannelBrokerClassValue, + }, + }, + }, + expectedConditionSet: brokerCondSet, + expectedBrokerStatus: duckv1.Status{}, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { + t.Errorf("unexpected conditions (-want, +got) %s", diff) + } + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.Status.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { + t.Errorf("unexpected conditions (-want, +got) %s", diff) + } + if diff := cmp.Diff(tc.expectedBrokerStatus, tc.broker.Status.Status); diff != "" { + t.Errorf("unexpected duck status (-want, +got) %s", diff) + } + }) + } +} + func TestBrokerGetCondition(t *testing.T) { tests := []struct { name string diff --git a/pkg/apis/eventing/v1beta1/eventtype_lifecycle.go b/pkg/apis/eventing/v1beta1/eventtype_lifecycle.go index 0b906823f94..0d5f6f95203 100644 --- a/pkg/apis/eventing/v1beta1/eventtype_lifecycle.go +++ b/pkg/apis/eventing/v1beta1/eventtype_lifecycle.go @@ -85,7 +85,7 @@ func (et *EventTypeStatus) MarkBrokerNotConfigured() { } func (et *EventTypeStatus) PropagateBrokerStatus(bs *BrokerStatus) { - bc := brokerCondSet.Manage(bs).GetTopLevelCondition() + bc := bs.GetConditionSet().Manage(bs).GetTopLevelCondition() if bc == nil { et.MarkBrokerNotConfigured() return From 37f5cdb35889185e51fb7ad57ca74cffd45dac4f Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 9 Jul 2020 20:32:53 +0200 Subject: [PATCH 2/2] Move MT broker lifecycle and handle only one broker class Signed-off-by: Pierangelo Di Pilato --- pkg/apis/eventing/v1/broker_lifecycle.go | 97 ++++--------------- pkg/apis/eventing/v1/broker_lifecycle_mt.go | 62 ++++++++++++ pkg/apis/eventing/v1/broker_lifecycle_test.go | 29 +----- pkg/apis/eventing/v1beta1/broker_lifecycle.go | 97 ++++--------------- .../eventing/v1beta1/broker_lifecycle_mt.go | 62 ++++++++++++ .../eventing/v1beta1/broker_lifecycle_test.go | 29 +----- pkg/reconciler/mtbroker/controller.go | 16 +++ 7 files changed, 192 insertions(+), 200 deletions(-) create mode 100644 pkg/apis/eventing/v1/broker_lifecycle_mt.go create mode 100644 pkg/apis/eventing/v1beta1/broker_lifecycle_mt.go diff --git a/pkg/apis/eventing/v1/broker_lifecycle.go b/pkg/apis/eventing/v1/broker_lifecycle.go index bbe672c3ff9..0dc966323b4 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1/broker_lifecycle.go @@ -17,66 +17,49 @@ limitations under the License. package v1 import ( - corev1 "k8s.io/api/core/v1" - - "knative.dev/eventing/pkg/apis/duck" - duckv1 "knative.dev/eventing/pkg/apis/duck/v1" - "knative.dev/eventing/pkg/apis/eventing" + "sync" "knative.dev/pkg/apis" ) +const ( + BrokerConditionReady = apis.ConditionReady + BrokerConditionIngress apis.ConditionType = "IngressReady" + BrokerConditionTriggerChannel apis.ConditionType = "TriggerChannelReady" + BrokerConditionFilter apis.ConditionType = "FilterReady" + BrokerConditionAddressable apis.ConditionType = "Addressable" +) + var brokerCondSet = apis.NewLivingConditionSet( BrokerConditionIngress, BrokerConditionTriggerChannel, BrokerConditionFilter, BrokerConditionAddressable, ) - -var customConditionSet = map[string]apis.ConditionSet{ - "": brokerCondSet, - eventing.MTChannelBrokerClassValue: brokerCondSet, -} +var brokerCondSetLock = sync.RWMutex{} // RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class. -// -// Calls to this function need to be synchronized by the caller (not thread-safe). -func RegisterAlternateBrokerConditionSet(brokerClass string, conditionSet apis.ConditionSet) { - customConditionSet[brokerClass] = conditionSet -} +func RegisterAlternateBrokerConditionSet(conditionSet apis.ConditionSet) { + brokerCondSetLock.Lock() + defer brokerCondSetLock.Unlock() -const ( - BrokerConditionReady = apis.ConditionReady - BrokerConditionIngress apis.ConditionType = "IngressReady" - BrokerConditionTriggerChannel apis.ConditionType = "TriggerChannelReady" - BrokerConditionFilter apis.ConditionType = "FilterReady" - BrokerConditionAddressable apis.ConditionType = "Addressable" -) + brokerCondSet = conditionSet +} // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. func (b *Broker) GetConditionSet() apis.ConditionSet { - - annotations := b.GetAnnotations() - if annotations != nil { - if brokerClass, ok := annotations[eventing.BrokerClassKey]; ok && brokerClass != eventing.MTChannelBrokerClassValue { - - // Set broker class as annotation of the status, so that we can use it. - if b.Status.Annotations == nil { - b.Status.Annotations = map[string]string{eventing.BrokerClassKey: brokerClass} - } else { - b.Status.Annotations[eventing.BrokerClassKey] = brokerClass - } - - return customConditionSet[brokerClass] - } - } + brokerCondSetLock.RLock() + defer brokerCondSetLock.RUnlock() return brokerCondSet } // GetConditionSet retrieves the condition set for this resource. func (bs *BrokerStatus) GetConditionSet() apis.ConditionSet { - return customConditionSet[bs.Annotations[eventing.BrokerClassKey]] + brokerCondSetLock.RLock() + defer brokerCondSetLock.RUnlock() + + return brokerCondSet } // GetTopLevelCondition returns the top level Condition. @@ -109,41 +92,3 @@ func (bs *BrokerStatus) IsReady() bool { func (bs *BrokerStatus) InitializeConditions() { bs.GetConditionSet().Manage(bs).InitializeConditions() } - -func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { - bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { - bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) - } else { - bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) - } -} - -func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { - bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1.ChannelableStatus) { - // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... - address := cs.AddressStatus.Address - if address != nil { - bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel) - } else { - bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") - } -} - -func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { - bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { - bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) - } else { - bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) - } -} diff --git a/pkg/apis/eventing/v1/broker_lifecycle_mt.go b/pkg/apis/eventing/v1/broker_lifecycle_mt.go new file mode 100644 index 00000000000..ce1cf4fa4b0 --- /dev/null +++ b/pkg/apis/eventing/v1/broker_lifecycle_mt.go @@ -0,0 +1,62 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1 + +import ( + corev1 "k8s.io/api/core/v1" + + "knative.dev/eventing/pkg/apis/duck" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" +) + +func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) + } else { + bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + } +} + +func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1.ChannelableStatus) { + // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... + address := cs.AddressStatus.Address + if address != nil { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel) + } else { + bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") + } +} + +func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) + } else { + bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + } +} diff --git a/pkg/apis/eventing/v1/broker_lifecycle_test.go b/pkg/apis/eventing/v1/broker_lifecycle_test.go index e487e8007a2..5b3e87cc554 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1/broker_lifecycle_test.go @@ -67,19 +67,15 @@ func TestBrokerGetConditionSet(t *testing.T) { ) brokerClass := "Golang" - RegisterAlternateBrokerConditionSet(brokerClass, customCondition) - tt := []struct { name string broker Broker expectedConditionSet apis.ConditionSet - expectedBrokerStatus duckv1.Status }{ { name: "default condition set", broker: Broker{}, expectedConditionSet: brokerCondSet, - expectedBrokerStatus: duckv1.Status{}, }, { name: "custom condition set", @@ -91,37 +87,22 @@ func TestBrokerGetConditionSet(t *testing.T) { }, }, expectedConditionSet: customCondition, - expectedBrokerStatus: duckv1.Status{ - Annotations: map[string]string{ - eventing.BrokerClassKey: brokerClass, - }, - }, - }, - { - name: "no status updates for " + eventing.MTChannelBrokerClassValue, - broker: Broker{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - eventing.BrokerClassKey: eventing.MTChannelBrokerClassValue, - }, - }, - }, - expectedConditionSet: brokerCondSet, - expectedBrokerStatus: duckv1.Status{}, }, } for _, tc := range tt { + tc := tc t.Run(tc.name, func(t *testing.T) { + defer RegisterAlternateBrokerConditionSet(brokerCondSet) // reset to default condition set + + RegisterAlternateBrokerConditionSet(tc.expectedConditionSet) + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { t.Errorf("unexpected conditions (-want, +got) %s", diff) } if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.Status.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { t.Errorf("unexpected conditions (-want, +got) %s", diff) } - if diff := cmp.Diff(tc.expectedBrokerStatus, tc.broker.Status.Status); diff != "" { - t.Errorf("unexpected duck status (-want, +got) %s", diff) - } }) } } diff --git a/pkg/apis/eventing/v1beta1/broker_lifecycle.go b/pkg/apis/eventing/v1beta1/broker_lifecycle.go index 503c173d7f0..b4169bccab9 100644 --- a/pkg/apis/eventing/v1beta1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1beta1/broker_lifecycle.go @@ -17,66 +17,49 @@ limitations under the License. package v1beta1 import ( - corev1 "k8s.io/api/core/v1" - - "knative.dev/eventing/pkg/apis/duck" - duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" - "knative.dev/eventing/pkg/apis/eventing" + "sync" "knative.dev/pkg/apis" ) +const ( + BrokerConditionReady = apis.ConditionReady + BrokerConditionIngress apis.ConditionType = "IngressReady" + BrokerConditionTriggerChannel apis.ConditionType = "TriggerChannelReady" + BrokerConditionFilter apis.ConditionType = "FilterReady" + BrokerConditionAddressable apis.ConditionType = "Addressable" +) + var brokerCondSet = apis.NewLivingConditionSet( BrokerConditionIngress, BrokerConditionTriggerChannel, BrokerConditionFilter, BrokerConditionAddressable, ) - -var customConditionSet = map[string]apis.ConditionSet{ - "": brokerCondSet, - eventing.MTChannelBrokerClassValue: brokerCondSet, -} +var brokerCondSetLock = sync.RWMutex{} // RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class. -// -// Calls to this function need to be synchronized by the caller (not thread-safe). -func RegisterAlternateBrokerConditionSet(brokerClass string, conditionSet apis.ConditionSet) { - customConditionSet[brokerClass] = conditionSet -} +func RegisterAlternateBrokerConditionSet(conditionSet apis.ConditionSet) { + brokerCondSetLock.Lock() + defer brokerCondSetLock.Unlock() -const ( - BrokerConditionReady = apis.ConditionReady - BrokerConditionIngress apis.ConditionType = "IngressReady" - BrokerConditionTriggerChannel apis.ConditionType = "TriggerChannelReady" - BrokerConditionFilter apis.ConditionType = "FilterReady" - BrokerConditionAddressable apis.ConditionType = "Addressable" -) + brokerCondSet = conditionSet +} // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. func (b *Broker) GetConditionSet() apis.ConditionSet { - - annotations := b.GetAnnotations() - if annotations != nil { - if brokerClass, ok := annotations[eventing.BrokerClassKey]; ok && brokerClass != eventing.MTChannelBrokerClassValue { - - // Set broker class as annotation of the status, so that we can use it. - if b.Status.Annotations == nil { - b.Status.Annotations = map[string]string{eventing.BrokerClassKey: brokerClass} - } else { - b.Status.Annotations[eventing.BrokerClassKey] = brokerClass - } - - return customConditionSet[brokerClass] - } - } + brokerCondSetLock.RLock() + defer brokerCondSetLock.RUnlock() return brokerCondSet } // GetConditionSet retrieves the condition set for this resource. func (bs *BrokerStatus) GetConditionSet() apis.ConditionSet { - return customConditionSet[bs.Annotations[eventing.BrokerClassKey]] + brokerCondSetLock.RLock() + defer brokerCondSetLock.RUnlock() + + return brokerCondSet } // GetTopLevelCondition returns the top level Condition. @@ -109,41 +92,3 @@ func (bs *BrokerStatus) IsReady() bool { func (bs *BrokerStatus) InitializeConditions() { bs.GetConditionSet().Manage(bs).InitializeConditions() } - -func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { - bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { - bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) - } else { - bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) - } -} - -func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { - bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1beta1.ChannelableStatus) { - // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... - address := cs.AddressStatus.Address - if address != nil { - bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel) - } else { - bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") - } -} - -func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { - bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { - bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) - } else { - bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) - } -} diff --git a/pkg/apis/eventing/v1beta1/broker_lifecycle_mt.go b/pkg/apis/eventing/v1beta1/broker_lifecycle_mt.go new file mode 100644 index 00000000000..2a39ed71a18 --- /dev/null +++ b/pkg/apis/eventing/v1beta1/broker_lifecycle_mt.go @@ -0,0 +1,62 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1beta1 + +import ( + corev1 "k8s.io/api/core/v1" + + "knative.dev/eventing/pkg/apis/duck" + duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" +) + +func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) + } else { + bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + } +} + +func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1beta1.ChannelableStatus) { + // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... + address := cs.AddressStatus.Address + if address != nil { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel) + } else { + bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") + } +} + +func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) + } else { + bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + } +} diff --git a/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go b/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go index 18f7ad3ae6c..5f7fb702da1 100644 --- a/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go @@ -67,19 +67,15 @@ func TestBrokerGetConditionSet(t *testing.T) { ) brokerClass := "Golang" - RegisterAlternateBrokerConditionSet(brokerClass, customCondition) - tt := []struct { name string broker Broker expectedConditionSet apis.ConditionSet - expectedBrokerStatus duckv1.Status }{ { name: "default condition set", broker: Broker{}, expectedConditionSet: brokerCondSet, - expectedBrokerStatus: duckv1.Status{}, }, { name: "custom condition set", @@ -91,37 +87,22 @@ func TestBrokerGetConditionSet(t *testing.T) { }, }, expectedConditionSet: customCondition, - expectedBrokerStatus: duckv1.Status{ - Annotations: map[string]string{ - eventing.BrokerClassKey: brokerClass, - }, - }, - }, - { - name: "no status updates for " + eventing.MTChannelBrokerClassValue, - broker: Broker{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - eventing.BrokerClassKey: eventing.MTChannelBrokerClassValue, - }, - }, - }, - expectedConditionSet: brokerCondSet, - expectedBrokerStatus: duckv1.Status{}, }, } for _, tc := range tt { + tc := tc t.Run(tc.name, func(t *testing.T) { + defer RegisterAlternateBrokerConditionSet(brokerCondSet) // reset to default condition set + + RegisterAlternateBrokerConditionSet(tc.expectedConditionSet) + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { t.Errorf("unexpected conditions (-want, +got) %s", diff) } if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.Status.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { t.Errorf("unexpected conditions (-want, +got) %s", diff) } - if diff := cmp.Diff(tc.expectedBrokerStatus, tc.broker.Status.Status); diff != "" { - t.Errorf("unexpected duck status (-want, +got) %s", diff) - } }) } } diff --git a/pkg/reconciler/mtbroker/controller.go b/pkg/reconciler/mtbroker/controller.go index 40486b7ba46..0f3366941ca 100644 --- a/pkg/reconciler/mtbroker/controller.go +++ b/pkg/reconciler/mtbroker/controller.go @@ -36,6 +36,7 @@ import ( brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1beta1/broker" "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler/names" + "knative.dev/pkg/apis" "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" client "knative.dev/pkg/client/injection/kube/client" @@ -54,6 +55,14 @@ import ( // when creating events. const controllerAgentName = "mt-broker-controller" +const ( + BrokerConditionReady = apis.ConditionReady + BrokerConditionIngress apis.ConditionType = "IngressReady" + BrokerConditionTriggerChannel apis.ConditionType = "TriggerChannelReady" + BrokerConditionFilter apis.ConditionType = "FilterReady" + BrokerConditionAddressable apis.ConditionType = "Addressable" +) + // NewController initializes the controller and is called by the generated code // Registers event handlers to enqueue events func NewController( @@ -85,6 +94,13 @@ func NewController( }() } + v1beta1.RegisterAlternateBrokerConditionSet(apis.NewLivingConditionSet( + BrokerConditionIngress, + BrokerConditionTriggerChannel, + BrokerConditionFilter, + BrokerConditionAddressable, + )) + r := &Reconciler{ eventingClientSet: eventingclient.Get(ctx), dynamicClientSet: dynamicclient.Get(ctx),