diff --git a/pkg/apis/eventing/v1alpha1/broker_lifecycle.go b/pkg/apis/eventing/v1alpha1/broker_lifecycle.go index 31ae7140dca..d6fa93499a5 100644 --- a/pkg/apis/eventing/v1alpha1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1alpha1/broker_lifecycle.go @@ -16,7 +16,11 @@ limitations under the License. package v1alpha1 -import duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" +import ( + v1 "k8s.io/api/apps/v1" + + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" +) var brokerCondSet = duckv1alpha1.NewLivingConditionSet( BrokerConditionIngress, @@ -57,44 +61,91 @@ func (bs *BrokerStatus) InitializeConditions() { brokerCondSet.Manage(bs).InitializeConditions() } -func (bs *BrokerStatus) MarkIngressReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) +func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) } -func (bs *BrokerStatus) MarkIngressFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, "failed", "%v", err) +func (bs *BrokerStatus) PropagateIngressDeploymentAvailability(d *v1.Deployment) { + if deploymentIsAvailable(&d.Status) { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) + } else { + // I don't know how to propagate the status well, so just give the name of the Deployment + // for now. + bs.MarkIngressFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) + } } -func (bs *BrokerStatus) MarkTriggerChannelReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) +func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) } -func (bs *BrokerStatus) MarkTriggerChannelFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, "failed", "%v", err) +func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *ChannelStatus) { + if cs.IsReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) + } else { + msg := "nil" + if cc := chanCondSet.Manage(cs).GetCondition(ChannelConditionReady); cc != nil { + msg = cc.Message + } + bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: %s", msg) + } } -func (bs *BrokerStatus) MarkIngressChannelReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressChannel) +func (bs *BrokerStatus) MarkIngressChannelFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressChannel, reason, format, args...) } -func (bs *BrokerStatus) MarkIngressChannelFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressChannel, "failed", "%v", err) +func (bs *BrokerStatus) PropagateIngressChannelReadiness(cs *ChannelStatus) { + if cs.IsReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressChannel) + } else { + msg := "nil" + if cc := chanCondSet.Manage(cs).GetCondition(ChannelConditionReady); cc != nil { + msg = cc.Message + } + bs.MarkIngressChannelFailed("ChannelNotReady", "ingress Channel is not ready: %s", msg) + } } -func (bs *BrokerStatus) MarkIngressSubscriptionReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressSubscription) +func (bs *BrokerStatus) MarkIngressSubscriptionFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressSubscription, reason, format, args...) } -func (bs *BrokerStatus) MarkIngressSubscriptionFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressSubscription, "failed", "%v", err) +func (bs *BrokerStatus) PropagateIngressSubscriptionReadiness(ss *SubscriptionStatus) { + if ss.IsReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressSubscription) + } else { + msg := "nil" + if sc := subCondSet.Manage(ss).GetCondition(SubscriptionConditionReady); sc != nil { + msg = sc.Message + } + bs.MarkIngressSubscriptionFailed("SubscriptionNotReady", "ingress Subscription is not ready: %s", msg) + } } -func (bs *BrokerStatus) MarkFilterReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) +func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) } -func (bs *BrokerStatus) MarkFilterFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, "failed", "%v", err) +func (bs *BrokerStatus) PropagateFilterDeploymentAvailability(d *v1.Deployment) { + if deploymentIsAvailable(&d.Status) { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) + } else { + // I don't know how to propagate the status well, so just give the name of the Deployment + // for now. + bs.MarkFilterFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) + } +} + +func deploymentIsAvailable(d *v1.DeploymentStatus) bool { + // Check if the Deployment is available. + for _, cond := range d.Conditions { + if cond.Type == v1.DeploymentAvailable { + return cond.Status == "True" + } + } + // Unable to find the Available condition, fail open. + return true } // SetAddress makes this Broker addressable by setting the hostname. It also diff --git a/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go index 73fa5663e24..c5826b94659 100644 --- a/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go @@ -17,19 +17,17 @@ limitations under the License. package v1alpha1 import ( - "errors" "testing" "github.com/google/go-cmp/cmp" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" ) var ( trueVal = true falseVal = false - - err = errors.New("foobar") ) var ( @@ -331,39 +329,49 @@ func TestBrokerIsReady(t *testing.T) { t.Run(test.name, func(t *testing.T) { bs := &BrokerStatus{} if test.markIngressReady != nil { + var d *v1.Deployment if *test.markIngressReady { - bs.MarkIngressReady() + d = availableDeployment() } else { - bs.MarkIngressFailed(err) + d = unavailableDeployment() } + bs.PropagateIngressDeploymentAvailability(d) } if test.markTriggerChannelReady != nil { + var c *ChannelStatus if *test.markTriggerChannelReady { - bs.MarkTriggerChannelReady() + c = readyChannelStatus() } else { - bs.MarkTriggerChannelFailed(err) + c = notReadyChannelStatus() } + bs.PropagateTriggerChannelReadiness(c) } if test.markIngressChannelReady != nil { + var c *ChannelStatus if *test.markIngressChannelReady { - bs.MarkIngressChannelReady() + c = readyChannelStatus() } else { - bs.MarkIngressChannelFailed(err) + c = notReadyChannelStatus() } + bs.PropagateIngressChannelReadiness(c) } if test.markIngressSubscriptionReady != nil { + var sub *SubscriptionStatus if *test.markIngressSubscriptionReady { - bs.MarkIngressSubscriptionReady() + sub = readySubscriptionStatus() } else { - bs.MarkIngressSubscriptionFailed(err) + sub = notReadySubscriptionStatus() } + bs.PropagateIngressSubscriptionReadiness(sub) } if test.markFilterReady != nil { + var d *v1.Deployment if *test.markFilterReady { - bs.MarkFilterReady() + d = availableDeployment() } else { - bs.MarkFilterFailed(err) + d = unavailableDeployment() } + bs.PropagateFilterDeploymentAvailability(d) } bs.SetAddress(test.address) @@ -374,3 +382,54 @@ func TestBrokerIsReady(t *testing.T) { }) } } + +func unavailableDeployment() *v1.Deployment { + d := &v1.Deployment{} + d.Name = "unavailable" + d.Status.Conditions = []v1.DeploymentCondition{ + { + Type: v1.DeploymentAvailable, + Status: "False", + }, + } + return d +} + +func availableDeployment() *v1.Deployment { + d := unavailableDeployment() + d.Name = "available" + d.Status.Conditions = []v1.DeploymentCondition{ + { + Type: v1.DeploymentAvailable, + Status: "True", + }, + } + return d +} + +func readyChannelStatus() *ChannelStatus { + cs := &ChannelStatus{} + cs.MarkProvisionerInstalled() + cs.MarkProvisioned() + cs.SetAddress("foo") + return cs +} + +func notReadyChannelStatus() *ChannelStatus { + cs := readyChannelStatus() + cs.MarkNotProvisioned("foo", "bar") + return cs +} + +func readySubscriptionStatus() *SubscriptionStatus { + ss := &SubscriptionStatus{} + ss.MarkChannelReady() + ss.MarkReferencesResolved() + return ss +} + +func notReadySubscriptionStatus() *SubscriptionStatus { + ss := &SubscriptionStatus{} + ss.MarkReferencesResolved() + return ss +} diff --git a/pkg/reconciler/v1alpha1/broker/broker.go b/pkg/reconciler/v1alpha1/broker/broker.go index 6a67152d253..6a1032b5189 100644 --- a/pkg/reconciler/v1alpha1/broker/broker.go +++ b/pkg/reconciler/v1alpha1/broker/broker.go @@ -19,7 +19,6 @@ package broker import ( "context" "fmt" - "time" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" @@ -29,7 +28,6 @@ import ( v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -52,7 +50,8 @@ const ( controllerAgentName = "broker-controller" // Name of the corev1.Events emitted from the reconciliation process. - brokerReconciled = "BrokerReconciled" + brokerReadinessChanged = "BrokerReadinessChanged" + brokerReconcileError = "BrokerReconcileError" brokerUpdateStatusFailed = "BrokerUpdateStatusFailed" ingressSubscriptionDeleteFailed = "IngressSubscriptionDeleteFailed" ingressSubscriptionCreateFailed = "IngressSubscriptionCreateFailed" @@ -133,7 +132,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err broker := &v1alpha1.Broker{} err := r.client.Get(ctx, request.NamespacedName, broker) - if errors.IsNotFound(err) { + if k8serrors.IsNotFound(err) { logging.FromContext(ctx).Info("Could not find Broker") return reconcile.Result{}, nil } @@ -143,16 +142,19 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err return reconcile.Result{}, err } + originalReadiness := broker.Status.IsReady() + // Reconcile this copy of the Broker and then write back any status updates regardless of // whether the reconcile error out. - result, reconcileErr := r.reconcile(ctx, broker) + reconcileErr := r.reconcile(ctx, broker) if reconcileErr != nil { logging.FromContext(ctx).Error("Error reconciling Broker", zap.Error(reconcileErr)) - } else if result.Requeue || result.RequeueAfter > 0 { - logging.FromContext(ctx).Debug("Broker reconcile requeuing") + r.recorder.Event(broker, corev1.EventTypeWarning, brokerReconcileError, fmt.Sprintf("Broker reconcile error: %v", reconcileErr)) } else { logging.FromContext(ctx).Debug("Broker reconciled") - r.recorder.Event(broker, corev1.EventTypeNormal, brokerReconciled, "Broker reconciled") + if originalReadiness != broker.Status.IsReady() { + r.recorder.Event(broker, corev1.EventTypeNormal, brokerReadinessChanged, fmt.Sprintf("Broker readiness changed to %v", broker.Status.IsReady())) + } } if _, err = r.updateStatus(broker); err != nil { @@ -162,10 +164,10 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err } // Requeue if the resource is not ready: - return result, reconcileErr + return reconcile.Result{}, reconcileErr } -func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconcile.Result, error) { +func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) error { b.Status.InitializeConditions() // 1. Trigger Channel is created for all events. Triggers will Subscribe to this Channel. @@ -181,72 +183,70 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci if b.DeletionTimestamp != nil { // Everything is cleaned up by the garbage collector. - return reconcile.Result{}, nil + return nil } triggerChan, err := r.reconcileTriggerChannel(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling the trigger channel", zap.Error(err)) - b.Status.MarkTriggerChannelFailed(err) - return reconcile.Result{}, err + b.Status.MarkTriggerChannelFailed("ChannelFailure", "%v", err) + return err } else if triggerChan.Status.Address.Hostname == "" { - logging.FromContext(ctx).Info("Trigger Channel is not yet ready", zap.Any("triggerChan", triggerChan)) - // Give the Channel some time to get its address. One second was chosen arbitrarily. - return reconcile.Result{RequeueAfter: time.Second}, nil + // We check the trigger Channel's address here because it is needed to create the Ingress + // Deployment. + logging.FromContext(ctx).Debug("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan)) + b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.") + return nil } - b.Status.MarkTriggerChannelReady() + b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status) - _, err = r.reconcileFilterDeployment(ctx, b) + filterDeployment, err := r.reconcileFilterDeployment(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err)) - b.Status.MarkFilterFailed(err) - return reconcile.Result{}, err + b.Status.MarkFilterFailed("DeploymentFailure", "%v", err) + return err } _, err = r.reconcileFilterService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err)) - b.Status.MarkFilterFailed(err) - return reconcile.Result{}, err + b.Status.MarkFilterFailed("ServiceFailure", "%v", err) + return err } - b.Status.MarkFilterReady() + b.Status.PropagateFilterDeploymentAvailability(filterDeployment) - _, err = r.reconcileIngressDeployment(ctx, b, triggerChan) + ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan) if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err)) - b.Status.MarkIngressFailed(err) - return reconcile.Result{}, err + b.Status.MarkIngressFailed("DeploymentFailure", "%v", err) + return err } svc, err := r.reconcileIngressService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err)) - b.Status.MarkIngressFailed(err) - return reconcile.Result{}, err + b.Status.MarkIngressFailed("ServiceFailure", "%v", err) + return err } - b.Status.MarkIngressReady() + b.Status.PropagateIngressDeploymentAvailability(ingressDeployment) b.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace)) ingressChan, err := r.reconcileIngressChannel(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling the ingress channel", zap.Error(err)) - b.Status.MarkIngressChannelFailed(err) - return reconcile.Result{}, err - } else if ingressChan.Status.Address.Hostname == "" { - logging.FromContext(ctx).Info("Ingress Channel is not yet ready", zap.Any("ingressChan", ingressChan)) - // Give the Channel some time to get its address. One second was chosen arbitrarily. - return reconcile.Result{RequeueAfter: time.Second}, nil + b.Status.MarkIngressChannelFailed("ChannelFailure", "%v", err) + return err } - b.Status.MarkIngressChannelReady() + b.Status.PropagateIngressChannelReadiness(&ingressChan.Status) - _, err = r.reconcileIngressSubscription(ctx, b, ingressChan, svc) + ingressSub, err := r.reconcileIngressSubscription(ctx, b, ingressChan, svc) if err != nil { logging.FromContext(ctx).Error("Problem reconciling the ingress subscription", zap.Error(err)) - b.Status.MarkIngressSubscriptionFailed(err) - return reconcile.Result{}, err + b.Status.MarkIngressSubscriptionFailed("SubscriptionFailure", "%v", err) + return err } - b.Status.MarkIngressSubscriptionReady() + b.Status.PropagateIngressSubscriptionReadiness(&ingressSub.Status) - return reconcile.Result{}, nil + return nil } // updateStatus may in fact update the broker's finalizers in addition to the status. diff --git a/pkg/reconciler/v1alpha1/broker/broker_test.go b/pkg/reconciler/v1alpha1/broker/broker_test.go index af53b9d1568..e5e37f13411 100644 --- a/pkg/reconciler/v1alpha1/broker/broker_test.go +++ b/pkg/reconciler/v1alpha1/broker/broker_test.go @@ -22,7 +22,6 @@ import ( "fmt" "strings" "testing" - "time" "github.com/google/go-cmp/cmp" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -32,6 +31,7 @@ import ( duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -39,7 +39,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( @@ -72,7 +71,8 @@ var ( // Map of events to set test cases' expectations easier. events = map[string]corev1.Event{ - brokerReconciled: {Reason: brokerReconciled, Type: corev1.EventTypeNormal}, + brokerReadinessChanged: {Reason: brokerReadinessChanged, Type: corev1.EventTypeNormal}, + brokerReconcileError: {Reason: brokerReconcileError, Type: corev1.EventTypeWarning}, brokerUpdateStatusFailed: {Reason: brokerUpdateStatusFailed, Type: corev1.EventTypeWarning}, ingressSubscriptionDeleteFailed: {Reason: ingressSubscriptionDeleteFailed, Type: corev1.EventTypeWarning}, ingressSubscriptionCreateFailed: {Reason: ingressSubscriptionCreateFailed, Type: corev1.EventTypeWarning}, @@ -146,11 +146,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeDeletingBroker(), }, - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - }, }, { Name: "Trigger Channel.List error", @@ -172,6 +167,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting Trigger Channel", }, { @@ -192,6 +188,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating Trigger Channel", }, { @@ -208,11 +205,6 @@ func TestReconcile(t *testing.T) { // GenerateName. // makeDifferentTriggerChannel(), }, - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - }, }, { Name: "Trigger Channel is not yet Addressable", @@ -221,7 +213,6 @@ func TestReconcile(t *testing.T) { makeBroker(), makeNonAddressableTriggerChannel(), }, - WantResult: reconcile.Result{RequeueAfter: time.Second}, }, { Name: "Filter Deployment.Get error", @@ -242,6 +233,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting filter Deployment", }, { @@ -263,6 +255,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating filter Deployment", }, { @@ -285,6 +278,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error updating filter Deployment", }, { @@ -306,6 +300,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting filter Service", }, { @@ -327,6 +322,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating filter Service", }, { @@ -349,6 +345,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error updating filter Service", }, { @@ -370,6 +367,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting ingress Deployment", }, { @@ -391,6 +389,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating ingress Deployment", }, { @@ -413,6 +412,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error updating ingress Deployment", }, { @@ -434,6 +434,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting ingress Service", }, { @@ -455,6 +456,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating ingress Service", }, { @@ -477,6 +479,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error updating ingress Service", }, { @@ -500,6 +503,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting Ingress Channel", }, { @@ -537,6 +541,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating Ingress Channel", }, { @@ -573,11 +578,6 @@ func TestReconcile(t *testing.T) { // GenerateName. // makeDifferentIngressChannel(), }, - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - }, }, { Name: "Ingress Channel is not yet Addressable", @@ -606,7 +606,6 @@ func TestReconcile(t *testing.T) { }, }, }, - WantResult: reconcile.Result{RequeueAfter: time.Second}, }, { Name: "Subscription.List error", @@ -626,6 +625,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting Subscription", }, { @@ -646,6 +646,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating Subscription", }, { @@ -663,11 +664,6 @@ func TestReconcile(t *testing.T) { // GenerateName. // makeDifferentSubscription(), }, - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - }, }, { Name: "Subscription.Delete error", @@ -688,7 +684,7 @@ func TestReconcile(t *testing.T) { }, }, }, - WantEvent: []corev1.Event{events[ingressSubscriptionDeleteFailed]}, + WantEvent: []corev1.Event{events[ingressSubscriptionDeleteFailed], events[brokerReconcileError]}, WantErrMsg: "test error deleting Subscription", }, { @@ -710,7 +706,7 @@ func TestReconcile(t *testing.T) { }, }, }, - WantEvent: []corev1.Event{events[ingressSubscriptionCreateFailed]}, + WantEvent: []corev1.Event{events[ingressSubscriptionCreateFailed], events[brokerReconcileError]}, WantErrMsg: "test error creating Subscription", }, { @@ -740,14 +736,7 @@ func TestReconcile(t *testing.T) { }, }, WantErrMsg: "test error getting the Broker for status update", - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - { - Reason: brokerUpdateStatusFailed, Type: corev1.EventTypeWarning, - }, - }, + WantEvent: []corev1.Event{events[brokerUpdateStatusFailed]}, }, { Name: "Broker.Status.Update error", @@ -768,14 +757,7 @@ func TestReconcile(t *testing.T) { }, }, WantErrMsg: "test error updating the Broker status", - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - { - Reason: brokerUpdateStatusFailed, Type: corev1.EventTypeWarning, - }, - }, + WantEvent: []corev1.Event{events[brokerUpdateStatusFailed]}, }, { Name: "Successful reconcile", @@ -785,6 +767,7 @@ func TestReconcile(t *testing.T) { // The Channel needs to be addressable for the reconcile to succeed. makeTriggerChannel(), makeIngressChannel(), + makeTestSubscription(), }, Mocks: controllertesting.Mocks{ MockLists: []controllertesting.MockList{ @@ -815,13 +798,10 @@ func TestReconcile(t *testing.T) { makeIngressService(), // TODO Uncomment makeIngressChannel() when our test framework handles generateName. // makeIngressChannel(), - // Because the makeTestSubscription(), }, WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, + events[brokerReadinessChanged], }, }, } @@ -866,12 +846,12 @@ func makeBroker() *v1alpha1.Broker { func makeReadyBroker() *v1alpha1.Broker { b := makeBroker() b.Status.InitializeConditions() - b.Status.MarkIngressReady() - b.Status.MarkTriggerChannelReady() - b.Status.MarkIngressChannelReady() - b.Status.MarkFilterReady() + b.Status.PropagateIngressDeploymentAvailability(makeAvailableDeployment()) + b.Status.PropagateTriggerChannelReadiness(makeReadyChannelStatus()) + b.Status.PropagateIngressChannelReadiness(makeReadyChannelStatus()) + b.Status.PropagateFilterDeploymentAvailability(makeAvailableDeployment()) b.Status.SetAddress(fmt.Sprintf("%s-broker.%s.svc.%s", brokerName, testNS, utils.GetClusterDomainName())) - b.Status.MarkIngressSubscriptionReady() + b.Status.PropagateIngressSubscriptionReadiness(makeReadySubscriptionStatus()) return b } @@ -882,7 +862,7 @@ func makeDeletingBroker() *v1alpha1.Broker { } func makeTriggerChannel() *v1alpha1.Channel { - return &v1alpha1.Channel{ + c := &v1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNS, GenerateName: fmt.Sprintf("%s-broker-", brokerName), @@ -897,12 +877,11 @@ func makeTriggerChannel() *v1alpha1.Channel { Spec: v1alpha1.ChannelSpec{ Provisioner: channelProvisioner, }, - Status: v1alpha1.ChannelStatus{ - Address: duckv1alpha1.Addressable{ - Hostname: triggerChannelHostname, - }, - }, } + c.Status.MarkProvisionerInstalled() + c.Status.MarkProvisioned() + c.Status.SetAddress(triggerChannelHostname) + return c } func makeNonAddressableTriggerChannel() *v1alpha1.Channel { @@ -918,7 +897,7 @@ func makeDifferentTriggerChannel() *v1alpha1.Channel { } func makeIngressChannel() *v1alpha1.Channel { - return &v1alpha1.Channel{ + c := &v1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNS, GenerateName: fmt.Sprintf("%s-broker-ingress-", brokerName), @@ -936,12 +915,11 @@ func makeIngressChannel() *v1alpha1.Channel { Spec: v1alpha1.ChannelSpec{ Provisioner: channelProvisioner, }, - Status: v1alpha1.ChannelStatus{ - Address: duckv1alpha1.Addressable{ - Hostname: ingressChannelHostname, - }, - }, } + c.Status.MarkProvisionerInstalled() + c.Status.MarkProvisioned() + c.Status.SetAddress(ingressChannelHostname) + return c } func makeNonAddressableIngressChannel() *v1alpha1.Channel { @@ -1026,7 +1004,7 @@ func makeDifferentIngressService() *corev1.Service { } func makeTestSubscription() *v1alpha1.Subscription { - return &v1alpha1.Subscription{ + s := &v1alpha1.Subscription{ TypeMeta: metav1.TypeMeta{ APIVersion: "eventing.knative.dev/v1alpha1", Kind: "Subscription", @@ -1057,6 +1035,9 @@ func makeTestSubscription() *v1alpha1.Subscription { }, }, } + s.Status.MarkChannelReady() + s.Status.MarkReferencesResolved() + return s } func makeDifferentSubscription() *v1alpha1.Subscription { @@ -1076,3 +1057,30 @@ func getOwnerReference() metav1.OwnerReference { BlockOwnerDeletion: &trueVal, } } + +func makeAvailableDeployment() *v1.Deployment { + d := &v1.Deployment{} + d.Name = "deployment-name" + d.Status.Conditions = []v1.DeploymentCondition{ + { + Type: v1.DeploymentAvailable, + Status: "True", + }, + } + return d +} + +func makeReadyChannelStatus() *v1alpha1.ChannelStatus { + cs := &v1alpha1.ChannelStatus{} + cs.MarkProvisionerInstalled() + cs.MarkProvisioned() + cs.SetAddress("foo") + return cs +} + +func makeReadySubscriptionStatus() *v1alpha1.SubscriptionStatus { + ss := &v1alpha1.SubscriptionStatus{} + ss.MarkChannelReady() + ss.MarkReferencesResolved() + return ss +}