Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 35 additions & 55 deletions pkg/apis/eventing/v1/broker_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,11 @@ limitations under the License.
package v1

import (
corev1 "k8s.io/api/core/v1"
"sync"

"knative.dev/eventing/pkg/apis/duck"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
)

var brokerCondSet = apis.NewLivingConditionSet(
BrokerConditionIngress,
BrokerConditionTriggerChannel,
BrokerConditionFilter,
BrokerConditionAddressable,
)

const (
BrokerConditionReady = apis.ConditionReady
BrokerConditionIngress apis.ConditionType = "IngressReady"
Expand All @@ -39,76 +30,65 @@ const (
BrokerConditionAddressable apis.ConditionType = "Addressable"
)

var brokerCondSet = apis.NewLivingConditionSet(
BrokerConditionIngress,
BrokerConditionTriggerChannel,
BrokerConditionFilter,
BrokerConditionAddressable,
)
var brokerCondSetLock = sync.RWMutex{}

// RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class.
func RegisterAlternateBrokerConditionSet(conditionSet apis.ConditionSet) {
brokerCondSetLock.Lock()
defer brokerCondSetLock.Unlock()

brokerCondSet = conditionSet
}

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
func (*Broker) GetConditionSet() apis.ConditionSet {
func (b *Broker) GetConditionSet() apis.ConditionSet {
brokerCondSetLock.RLock()
defer brokerCondSetLock.RUnlock()

return brokerCondSet
}

// GetConditionSet retrieves the condition set for this resource.
func (bs *BrokerStatus) GetConditionSet() apis.ConditionSet {
brokerCondSetLock.RLock()
defer brokerCondSetLock.RUnlock()

return brokerCondSet
}

// GetTopLevelCondition returns the top level Condition.
func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition {
return brokerCondSet.Manage(bs).GetTopLevelCondition()
return bs.GetConditionSet().Manage(bs).GetTopLevelCondition()
}

// SetAddress makes this Broker addressable by setting the URI. It also
// sets the BrokerConditionAddressable to true.
func (bs *BrokerStatus) SetAddress(url *apis.URL) {
bs.Address.URL = url
if url != nil {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionAddressable)
bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionAddressable)
} else {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil")
bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil")
}
}

// GetCondition returns the condition currently associated with the given type, or nil.
func (bs *BrokerStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return brokerCondSet.Manage(bs).GetCondition(t)
return bs.GetConditionSet().Manage(bs).GetCondition(t)
}

// IsReady returns true if the resource is ready overall.
func (bs *BrokerStatus) IsReady() bool {
return brokerCondSet.Manage(bs).IsHappy()
return bs.GetConditionSet().Manage(bs).IsHappy()
}

// InitializeConditions sets relevant unset conditions to Unknown state.
func (bs *BrokerStatus) InitializeConditions() {
brokerCondSet.Manage(bs).InitializeConditions()
}

func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...)
}

func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress)
} else {
bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
}

func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...)
}

func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1.ChannelableStatus) {
// TODO: Once you can get a Ready status from Channelable in a generic way, use it here...
address := cs.AddressStatus.Address
if address != nil {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel)
} else {
bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable")
}
}

func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...)
}

func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter)
} else {
bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
bs.GetConditionSet().Manage(bs).InitializeConditions()
}
62 changes: 62 additions & 0 deletions pkg/apis/eventing/v1/broker_lifecycle_mt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had actually been thinking about moving them away from our API directory alltogether and into the pkg/reconciler. Does not have to be part of this PR obviously :) But again, just sharing my thoughts.

* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1

import (
corev1 "k8s.io/api/core/v1"

"knative.dev/eventing/pkg/apis/duck"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
)

func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) {
bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...)
}

func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress)
} else {
bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
}

func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) {
bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...)
}

func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1.ChannelableStatus) {
// TODO: Once you can get a Ready status from Channelable in a generic way, use it here...
address := cs.AddressStatus.Address
if address != nil {
bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel)
} else {
bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable")
}
}

func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) {
bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...)
}

func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter)
} else {
bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
}
50 changes: 50 additions & 0 deletions pkg/apis/eventing/v1/broker_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"

"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -57,6 +59,54 @@ var (
}
)

func TestBrokerGetConditionSet(t *testing.T) {

customCondition := apis.NewLivingConditionSet(
apis.ConditionReady,
"ConditionGolangReady",
)
brokerClass := "Golang"

tt := []struct {
name string
broker Broker
expectedConditionSet apis.ConditionSet
}{
{
name: "default condition set",
broker: Broker{},
expectedConditionSet: brokerCondSet,
},
{
name: "custom condition set",
broker: Broker{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
eventing.BrokerClassKey: brokerClass,
},
},
},
expectedConditionSet: customCondition,
},
}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
defer RegisterAlternateBrokerConditionSet(brokerCondSet) // reset to default condition set

RegisterAlternateBrokerConditionSet(tc.expectedConditionSet)

if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" {
t.Errorf("unexpected conditions (-want, +got) %s", diff)
}
if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.Status.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" {
t.Errorf("unexpected conditions (-want, +got) %s", diff)
}
})
}
}

func TestBrokerGetCondition(t *testing.T) {
tests := []struct {
name string
Expand Down
90 changes: 35 additions & 55 deletions pkg/apis/eventing/v1beta1/broker_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,11 @@ limitations under the License.
package v1beta1

import (
corev1 "k8s.io/api/core/v1"
"sync"

"knative.dev/eventing/pkg/apis/duck"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/pkg/apis"
)

var brokerCondSet = apis.NewLivingConditionSet(
BrokerConditionIngress,
BrokerConditionTriggerChannel,
BrokerConditionFilter,
BrokerConditionAddressable,
)

const (
BrokerConditionReady = apis.ConditionReady
BrokerConditionIngress apis.ConditionType = "IngressReady"
Expand All @@ -39,76 +30,65 @@ const (
BrokerConditionAddressable apis.ConditionType = "Addressable"
)

var brokerCondSet = apis.NewLivingConditionSet(
BrokerConditionIngress,
BrokerConditionTriggerChannel,
BrokerConditionFilter,
BrokerConditionAddressable,
)
var brokerCondSetLock = sync.RWMutex{}

// RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class.
func RegisterAlternateBrokerConditionSet(conditionSet apis.ConditionSet) {
brokerCondSetLock.Lock()
defer brokerCondSetLock.Unlock()

brokerCondSet = conditionSet
}

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
func (*Broker) GetConditionSet() apis.ConditionSet {
func (b *Broker) GetConditionSet() apis.ConditionSet {
brokerCondSetLock.RLock()
defer brokerCondSetLock.RUnlock()

return brokerCondSet
}

// GetConditionSet retrieves the condition set for this resource.
func (bs *BrokerStatus) GetConditionSet() apis.ConditionSet {
brokerCondSetLock.RLock()
defer brokerCondSetLock.RUnlock()

return brokerCondSet
}

// GetTopLevelCondition returns the top level Condition.
func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition {
return brokerCondSet.Manage(bs).GetTopLevelCondition()
return bs.GetConditionSet().Manage(bs).GetTopLevelCondition()
}

// SetAddress makes this Broker addressable by setting the URI. It also
// sets the BrokerConditionAddressable to true.
func (bs *BrokerStatus) SetAddress(url *apis.URL) {
bs.Address.URL = url
if url != nil {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionAddressable)
bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionAddressable)
} else {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil")
bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil")
}
}

// GetCondition returns the condition currently associated with the given type, or nil.
func (bs *BrokerStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return brokerCondSet.Manage(bs).GetCondition(t)
return bs.GetConditionSet().Manage(bs).GetCondition(t)
}

// IsReady returns true if the resource is ready overall.
func (bs *BrokerStatus) IsReady() bool {
return brokerCondSet.Manage(bs).IsHappy()
return bs.GetConditionSet().Manage(bs).IsHappy()
}

// InitializeConditions sets relevant unset conditions to Unknown state.
func (bs *BrokerStatus) InitializeConditions() {
brokerCondSet.Manage(bs).InitializeConditions()
}

func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...)
}

func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress)
} else {
bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
}

func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...)
}

func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1beta1.ChannelableStatus) {
// TODO: Once you can get a Ready status from Channelable in a generic way, use it here...
address := cs.AddressStatus.Address
if address != nil {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel)
} else {
bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable")
}
}

func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) {
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...)
}

func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter)
} else {
bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
}
bs.GetConditionSet().Manage(bs).InitializeConditions()
}
Loading