From 1e561033ebc2b99ff5ff5b4f7a695f3bed04207e Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 15 Apr 2019 16:03:18 -0700 Subject: [PATCH 1/4] Broker is not ready until its constituent pieces are ready. --- .../eventing/v1alpha1/broker_lifecycle.go | 93 ++++++++++++++----- pkg/reconciler/v1alpha1/broker/broker.go | 46 +++++---- 2 files changed, 93 insertions(+), 46 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/broker_lifecycle.go b/pkg/apis/eventing/v1alpha1/broker_lifecycle.go index 31ae7140dca..d6fa93499a5 100644 --- a/pkg/apis/eventing/v1alpha1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1alpha1/broker_lifecycle.go @@ -16,7 +16,11 @@ limitations under the License. package v1alpha1 -import duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" +import ( + v1 "k8s.io/api/apps/v1" + + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" +) var brokerCondSet = duckv1alpha1.NewLivingConditionSet( BrokerConditionIngress, @@ -57,44 +61,91 @@ func (bs *BrokerStatus) InitializeConditions() { brokerCondSet.Manage(bs).InitializeConditions() } -func (bs *BrokerStatus) MarkIngressReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) +func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) } -func (bs *BrokerStatus) MarkIngressFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, "failed", "%v", err) +func (bs *BrokerStatus) PropagateIngressDeploymentAvailability(d *v1.Deployment) { + if deploymentIsAvailable(&d.Status) { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) + } else { + // I don't know how to propagate the status well, so just give the name of the Deployment + // for now. + bs.MarkIngressFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) + } } -func (bs *BrokerStatus) MarkTriggerChannelReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) +func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) } -func (bs *BrokerStatus) MarkTriggerChannelFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, "failed", "%v", err) +func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *ChannelStatus) { + if cs.IsReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) + } else { + msg := "nil" + if cc := chanCondSet.Manage(cs).GetCondition(ChannelConditionReady); cc != nil { + msg = cc.Message + } + bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: %s", msg) + } } -func (bs *BrokerStatus) MarkIngressChannelReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressChannel) +func (bs *BrokerStatus) MarkIngressChannelFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressChannel, reason, format, args...) } -func (bs *BrokerStatus) MarkIngressChannelFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressChannel, "failed", "%v", err) +func (bs *BrokerStatus) PropagateIngressChannelReadiness(cs *ChannelStatus) { + if cs.IsReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressChannel) + } else { + msg := "nil" + if cc := chanCondSet.Manage(cs).GetCondition(ChannelConditionReady); cc != nil { + msg = cc.Message + } + bs.MarkIngressChannelFailed("ChannelNotReady", "ingress Channel is not ready: %s", msg) + } } -func (bs *BrokerStatus) MarkIngressSubscriptionReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressSubscription) +func (bs *BrokerStatus) MarkIngressSubscriptionFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressSubscription, reason, format, args...) } -func (bs *BrokerStatus) MarkIngressSubscriptionFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressSubscription, "failed", "%v", err) +func (bs *BrokerStatus) PropagateIngressSubscriptionReadiness(ss *SubscriptionStatus) { + if ss.IsReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressSubscription) + } else { + msg := "nil" + if sc := subCondSet.Manage(ss).GetCondition(SubscriptionConditionReady); sc != nil { + msg = sc.Message + } + bs.MarkIngressSubscriptionFailed("SubscriptionNotReady", "ingress Subscription is not ready: %s", msg) + } } -func (bs *BrokerStatus) MarkFilterReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) +func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) } -func (bs *BrokerStatus) MarkFilterFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, "failed", "%v", err) +func (bs *BrokerStatus) PropagateFilterDeploymentAvailability(d *v1.Deployment) { + if deploymentIsAvailable(&d.Status) { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) + } else { + // I don't know how to propagate the status well, so just give the name of the Deployment + // for now. + bs.MarkFilterFailed("DeploymentUnavailable", "The Deployment '%s' is unavailable.", d.Name) + } +} + +func deploymentIsAvailable(d *v1.DeploymentStatus) bool { + // Check if the Deployment is available. + for _, cond := range d.Conditions { + if cond.Type == v1.DeploymentAvailable { + return cond.Status == "True" + } + } + // Unable to find the Available condition, fail open. + return true } // SetAddress makes this Broker addressable by setting the hostname. It also diff --git a/pkg/reconciler/v1alpha1/broker/broker.go b/pkg/reconciler/v1alpha1/broker/broker.go index 6a67152d253..afd0fc3729a 100644 --- a/pkg/reconciler/v1alpha1/broker/broker.go +++ b/pkg/reconciler/v1alpha1/broker/broker.go @@ -19,7 +19,6 @@ package broker import ( "context" "fmt" - "time" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" @@ -29,7 +28,6 @@ import ( v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -133,7 +131,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err broker := &v1alpha1.Broker{} err := r.client.Get(ctx, request.NamespacedName, broker) - if errors.IsNotFound(err) { + if k8serrors.IsNotFound(err) { logging.FromContext(ctx).Info("Could not find Broker") return reconcile.Result{}, nil } @@ -187,64 +185,62 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci triggerChan, err := r.reconcileTriggerChannel(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling the trigger channel", zap.Error(err)) - b.Status.MarkTriggerChannelFailed(err) + b.Status.MarkTriggerChannelFailed("ChannelFailure", "%v", err) return reconcile.Result{}, err } else if triggerChan.Status.Address.Hostname == "" { - logging.FromContext(ctx).Info("Trigger Channel is not yet ready", zap.Any("triggerChan", triggerChan)) - // Give the Channel some time to get its address. One second was chosen arbitrarily. - return reconcile.Result{RequeueAfter: time.Second}, nil + // We check the trigger Channel's address here because it is needed to create the Ingress + // Deployment. + logging.FromContext(ctx).Debug("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan)) + b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.") + return reconcile.Result{}, nil } - b.Status.MarkTriggerChannelReady() + b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status) - _, err = r.reconcileFilterDeployment(ctx, b) + filterDeployment, err := r.reconcileFilterDeployment(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err)) - b.Status.MarkFilterFailed(err) + b.Status.MarkFilterFailed("DeploymentFailure", "%v", err) return reconcile.Result{}, err } _, err = r.reconcileFilterService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err)) - b.Status.MarkFilterFailed(err) + b.Status.MarkFilterFailed("ServiceFailure", "%v", err) return reconcile.Result{}, err } - b.Status.MarkFilterReady() + b.Status.PropagateFilterDeploymentAvailability(filterDeployment) - _, err = r.reconcileIngressDeployment(ctx, b, triggerChan) + ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan) if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err)) - b.Status.MarkIngressFailed(err) + b.Status.MarkIngressFailed("DeploymentFailure", "%v", err) return reconcile.Result{}, err } svc, err := r.reconcileIngressService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err)) - b.Status.MarkIngressFailed(err) + b.Status.MarkIngressFailed("ServiceFailure", "%v", err) return reconcile.Result{}, err } - b.Status.MarkIngressReady() + b.Status.PropagateIngressDeploymentAvailability(ingressDeployment) b.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace)) ingressChan, err := r.reconcileIngressChannel(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling the ingress channel", zap.Error(err)) - b.Status.MarkIngressChannelFailed(err) + b.Status.MarkIngressChannelFailed("ChannelFailure", "%v", err) return reconcile.Result{}, err - } else if ingressChan.Status.Address.Hostname == "" { - logging.FromContext(ctx).Info("Ingress Channel is not yet ready", zap.Any("ingressChan", ingressChan)) - // Give the Channel some time to get its address. One second was chosen arbitrarily. - return reconcile.Result{RequeueAfter: time.Second}, nil } - b.Status.MarkIngressChannelReady() + b.Status.PropagateIngressChannelReadiness(&ingressChan.Status) - _, err = r.reconcileIngressSubscription(ctx, b, ingressChan, svc) + ingressSub, err := r.reconcileIngressSubscription(ctx, b, ingressChan, svc) if err != nil { logging.FromContext(ctx).Error("Problem reconciling the ingress subscription", zap.Error(err)) - b.Status.MarkIngressSubscriptionFailed(err) + b.Status.MarkIngressSubscriptionFailed("SubscriptionFailure", "%v", err) return reconcile.Result{}, err } - b.Status.MarkIngressSubscriptionReady() + b.Status.PropagateIngressSubscriptionReadiness(&ingressSub.Status) return reconcile.Result{}, nil } From 8ccb32a3bc37657cf5b7cffacff118888291fed4 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Wed, 17 Apr 2019 15:41:38 -0700 Subject: [PATCH 2/4] Fix broker_lifecycle_test.go. --- .../v1alpha1/broker_lifecycle_test.go | 51 ++++++++++++++----- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go index 73fa5663e24..0636a3ec3db 100644 --- a/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go @@ -17,19 +17,17 @@ limitations under the License. package v1alpha1 import ( - "errors" "testing" "github.com/google/go-cmp/cmp" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" ) var ( trueVal = true falseVal = false - - err = errors.New("foobar") ) var ( @@ -332,37 +330,37 @@ func TestBrokerIsReady(t *testing.T) { bs := &BrokerStatus{} if test.markIngressReady != nil { if *test.markIngressReady { - bs.MarkIngressReady() + bs.PropagateIngressDeploymentAvailability(availableDeployment()) } else { - bs.MarkIngressFailed(err) + bs.MarkIngressFailed("failed", "err") } } if test.markTriggerChannelReady != nil { if *test.markTriggerChannelReady { - bs.MarkTriggerChannelReady() + bs.PropagateTriggerChannelReadiness(readyChannelStatus()) } else { - bs.MarkTriggerChannelFailed(err) + bs.MarkTriggerChannelFailed("failed", "err") } } if test.markIngressChannelReady != nil { if *test.markIngressChannelReady { - bs.MarkIngressChannelReady() + bs.PropagateIngressChannelReadiness(readyChannelStatus()) } else { - bs.MarkIngressChannelFailed(err) + bs.MarkIngressChannelFailed("failed", "err") } } if test.markIngressSubscriptionReady != nil { if *test.markIngressSubscriptionReady { - bs.MarkIngressSubscriptionReady() + bs.PropagateIngressSubscriptionReadiness(readySubscriptionStatus()) } else { - bs.MarkIngressSubscriptionFailed(err) + bs.MarkIngressSubscriptionFailed("failed", "err") } } if test.markFilterReady != nil { if *test.markFilterReady { - bs.MarkFilterReady() + bs.PropagateFilterDeploymentAvailability(availableDeployment()) } else { - bs.MarkFilterFailed(err) + bs.MarkFilterFailed("failed", "err") } } bs.SetAddress(test.address) @@ -374,3 +372,30 @@ func TestBrokerIsReady(t *testing.T) { }) } } + +func availableDeployment() *v1.Deployment { + d := &v1.Deployment{} + d.Name = "deployment-name" + d.Status.Conditions = []v1.DeploymentCondition{ + { + Type: v1.DeploymentAvailable, + Status: "True", + }, + } + return d +} + +func readyChannelStatus() *ChannelStatus { + cs := &ChannelStatus{} + cs.MarkProvisionerInstalled() + cs.MarkProvisioned() + cs.SetAddress("foo") + return cs +} + +func readySubscriptionStatus() *SubscriptionStatus { + ss := &SubscriptionStatus{} + ss.MarkChannelReady() + ss.MarkReferencesResolved() + return ss +} From 69570bcbb680f4b1d570ae93a6a9a892fa9d7c86 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Wed, 17 Apr 2019 16:06:15 -0700 Subject: [PATCH 3/4] Fix broker_test.go. --- pkg/reconciler/v1alpha1/broker/broker.go | 38 ++--- pkg/reconciler/v1alpha1/broker/broker_test.go | 138 +++++++++--------- 2 files changed, 94 insertions(+), 82 deletions(-) diff --git a/pkg/reconciler/v1alpha1/broker/broker.go b/pkg/reconciler/v1alpha1/broker/broker.go index afd0fc3729a..6a1032b5189 100644 --- a/pkg/reconciler/v1alpha1/broker/broker.go +++ b/pkg/reconciler/v1alpha1/broker/broker.go @@ -50,7 +50,8 @@ const ( controllerAgentName = "broker-controller" // Name of the corev1.Events emitted from the reconciliation process. - brokerReconciled = "BrokerReconciled" + brokerReadinessChanged = "BrokerReadinessChanged" + brokerReconcileError = "BrokerReconcileError" brokerUpdateStatusFailed = "BrokerUpdateStatusFailed" ingressSubscriptionDeleteFailed = "IngressSubscriptionDeleteFailed" ingressSubscriptionCreateFailed = "IngressSubscriptionCreateFailed" @@ -141,16 +142,19 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err return reconcile.Result{}, err } + originalReadiness := broker.Status.IsReady() + // Reconcile this copy of the Broker and then write back any status updates regardless of // whether the reconcile error out. - result, reconcileErr := r.reconcile(ctx, broker) + reconcileErr := r.reconcile(ctx, broker) if reconcileErr != nil { logging.FromContext(ctx).Error("Error reconciling Broker", zap.Error(reconcileErr)) - } else if result.Requeue || result.RequeueAfter > 0 { - logging.FromContext(ctx).Debug("Broker reconcile requeuing") + r.recorder.Event(broker, corev1.EventTypeWarning, brokerReconcileError, fmt.Sprintf("Broker reconcile error: %v", reconcileErr)) } else { logging.FromContext(ctx).Debug("Broker reconciled") - r.recorder.Event(broker, corev1.EventTypeNormal, brokerReconciled, "Broker reconciled") + if originalReadiness != broker.Status.IsReady() { + r.recorder.Event(broker, corev1.EventTypeNormal, brokerReadinessChanged, fmt.Sprintf("Broker readiness changed to %v", broker.Status.IsReady())) + } } if _, err = r.updateStatus(broker); err != nil { @@ -160,10 +164,10 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err } // Requeue if the resource is not ready: - return result, reconcileErr + return reconcile.Result{}, reconcileErr } -func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconcile.Result, error) { +func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) error { b.Status.InitializeConditions() // 1. Trigger Channel is created for all events. Triggers will Subscribe to this Channel. @@ -179,20 +183,20 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci if b.DeletionTimestamp != nil { // Everything is cleaned up by the garbage collector. - return reconcile.Result{}, nil + return nil } triggerChan, err := r.reconcileTriggerChannel(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling the trigger channel", zap.Error(err)) b.Status.MarkTriggerChannelFailed("ChannelFailure", "%v", err) - return reconcile.Result{}, err + return err } else if triggerChan.Status.Address.Hostname == "" { // We check the trigger Channel's address here because it is needed to create the Ingress // Deployment. logging.FromContext(ctx).Debug("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan)) b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.") - return reconcile.Result{}, nil + return nil } b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status) @@ -200,13 +204,13 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci if err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err)) b.Status.MarkFilterFailed("DeploymentFailure", "%v", err) - return reconcile.Result{}, err + return err } _, err = r.reconcileFilterService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err)) b.Status.MarkFilterFailed("ServiceFailure", "%v", err) - return reconcile.Result{}, err + return err } b.Status.PropagateFilterDeploymentAvailability(filterDeployment) @@ -214,14 +218,14 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err)) b.Status.MarkIngressFailed("DeploymentFailure", "%v", err) - return reconcile.Result{}, err + return err } svc, err := r.reconcileIngressService(ctx, b) if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err)) b.Status.MarkIngressFailed("ServiceFailure", "%v", err) - return reconcile.Result{}, err + return err } b.Status.PropagateIngressDeploymentAvailability(ingressDeployment) b.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace)) @@ -230,7 +234,7 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci if err != nil { logging.FromContext(ctx).Error("Problem reconciling the ingress channel", zap.Error(err)) b.Status.MarkIngressChannelFailed("ChannelFailure", "%v", err) - return reconcile.Result{}, err + return err } b.Status.PropagateIngressChannelReadiness(&ingressChan.Status) @@ -238,11 +242,11 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci if err != nil { logging.FromContext(ctx).Error("Problem reconciling the ingress subscription", zap.Error(err)) b.Status.MarkIngressSubscriptionFailed("SubscriptionFailure", "%v", err) - return reconcile.Result{}, err + return err } b.Status.PropagateIngressSubscriptionReadiness(&ingressSub.Status) - return reconcile.Result{}, nil + return nil } // updateStatus may in fact update the broker's finalizers in addition to the status. diff --git a/pkg/reconciler/v1alpha1/broker/broker_test.go b/pkg/reconciler/v1alpha1/broker/broker_test.go index af53b9d1568..e5e37f13411 100644 --- a/pkg/reconciler/v1alpha1/broker/broker_test.go +++ b/pkg/reconciler/v1alpha1/broker/broker_test.go @@ -22,7 +22,6 @@ import ( "fmt" "strings" "testing" - "time" "github.com/google/go-cmp/cmp" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -32,6 +31,7 @@ import ( duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -39,7 +39,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( @@ -72,7 +71,8 @@ var ( // Map of events to set test cases' expectations easier. events = map[string]corev1.Event{ - brokerReconciled: {Reason: brokerReconciled, Type: corev1.EventTypeNormal}, + brokerReadinessChanged: {Reason: brokerReadinessChanged, Type: corev1.EventTypeNormal}, + brokerReconcileError: {Reason: brokerReconcileError, Type: corev1.EventTypeWarning}, brokerUpdateStatusFailed: {Reason: brokerUpdateStatusFailed, Type: corev1.EventTypeWarning}, ingressSubscriptionDeleteFailed: {Reason: ingressSubscriptionDeleteFailed, Type: corev1.EventTypeWarning}, ingressSubscriptionCreateFailed: {Reason: ingressSubscriptionCreateFailed, Type: corev1.EventTypeWarning}, @@ -146,11 +146,6 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeDeletingBroker(), }, - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - }, }, { Name: "Trigger Channel.List error", @@ -172,6 +167,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting Trigger Channel", }, { @@ -192,6 +188,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating Trigger Channel", }, { @@ -208,11 +205,6 @@ func TestReconcile(t *testing.T) { // GenerateName. // makeDifferentTriggerChannel(), }, - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - }, }, { Name: "Trigger Channel is not yet Addressable", @@ -221,7 +213,6 @@ func TestReconcile(t *testing.T) { makeBroker(), makeNonAddressableTriggerChannel(), }, - WantResult: reconcile.Result{RequeueAfter: time.Second}, }, { Name: "Filter Deployment.Get error", @@ -242,6 +233,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting filter Deployment", }, { @@ -263,6 +255,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating filter Deployment", }, { @@ -285,6 +278,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error updating filter Deployment", }, { @@ -306,6 +300,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting filter Service", }, { @@ -327,6 +322,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating filter Service", }, { @@ -349,6 +345,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error updating filter Service", }, { @@ -370,6 +367,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting ingress Deployment", }, { @@ -391,6 +389,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating ingress Deployment", }, { @@ -413,6 +412,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error updating ingress Deployment", }, { @@ -434,6 +434,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting ingress Service", }, { @@ -455,6 +456,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating ingress Service", }, { @@ -477,6 +479,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error updating ingress Service", }, { @@ -500,6 +503,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting Ingress Channel", }, { @@ -537,6 +541,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating Ingress Channel", }, { @@ -573,11 +578,6 @@ func TestReconcile(t *testing.T) { // GenerateName. // makeDifferentIngressChannel(), }, - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - }, }, { Name: "Ingress Channel is not yet Addressable", @@ -606,7 +606,6 @@ func TestReconcile(t *testing.T) { }, }, }, - WantResult: reconcile.Result{RequeueAfter: time.Second}, }, { Name: "Subscription.List error", @@ -626,6 +625,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error getting Subscription", }, { @@ -646,6 +646,7 @@ func TestReconcile(t *testing.T) { }, }, }, + WantEvent: []corev1.Event{events[brokerReconcileError]}, WantErrMsg: "test error creating Subscription", }, { @@ -663,11 +664,6 @@ func TestReconcile(t *testing.T) { // GenerateName. // makeDifferentSubscription(), }, - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - }, }, { Name: "Subscription.Delete error", @@ -688,7 +684,7 @@ func TestReconcile(t *testing.T) { }, }, }, - WantEvent: []corev1.Event{events[ingressSubscriptionDeleteFailed]}, + WantEvent: []corev1.Event{events[ingressSubscriptionDeleteFailed], events[brokerReconcileError]}, WantErrMsg: "test error deleting Subscription", }, { @@ -710,7 +706,7 @@ func TestReconcile(t *testing.T) { }, }, }, - WantEvent: []corev1.Event{events[ingressSubscriptionCreateFailed]}, + WantEvent: []corev1.Event{events[ingressSubscriptionCreateFailed], events[brokerReconcileError]}, WantErrMsg: "test error creating Subscription", }, { @@ -740,14 +736,7 @@ func TestReconcile(t *testing.T) { }, }, WantErrMsg: "test error getting the Broker for status update", - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - { - Reason: brokerUpdateStatusFailed, Type: corev1.EventTypeWarning, - }, - }, + WantEvent: []corev1.Event{events[brokerUpdateStatusFailed]}, }, { Name: "Broker.Status.Update error", @@ -768,14 +757,7 @@ func TestReconcile(t *testing.T) { }, }, WantErrMsg: "test error updating the Broker status", - WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, - { - Reason: brokerUpdateStatusFailed, Type: corev1.EventTypeWarning, - }, - }, + WantEvent: []corev1.Event{events[brokerUpdateStatusFailed]}, }, { Name: "Successful reconcile", @@ -785,6 +767,7 @@ func TestReconcile(t *testing.T) { // The Channel needs to be addressable for the reconcile to succeed. makeTriggerChannel(), makeIngressChannel(), + makeTestSubscription(), }, Mocks: controllertesting.Mocks{ MockLists: []controllertesting.MockList{ @@ -815,13 +798,10 @@ func TestReconcile(t *testing.T) { makeIngressService(), // TODO Uncomment makeIngressChannel() when our test framework handles generateName. // makeIngressChannel(), - // Because the makeTestSubscription(), }, WantEvent: []corev1.Event{ - { - Reason: brokerReconciled, Type: corev1.EventTypeNormal, - }, + events[brokerReadinessChanged], }, }, } @@ -866,12 +846,12 @@ func makeBroker() *v1alpha1.Broker { func makeReadyBroker() *v1alpha1.Broker { b := makeBroker() b.Status.InitializeConditions() - b.Status.MarkIngressReady() - b.Status.MarkTriggerChannelReady() - b.Status.MarkIngressChannelReady() - b.Status.MarkFilterReady() + b.Status.PropagateIngressDeploymentAvailability(makeAvailableDeployment()) + b.Status.PropagateTriggerChannelReadiness(makeReadyChannelStatus()) + b.Status.PropagateIngressChannelReadiness(makeReadyChannelStatus()) + b.Status.PropagateFilterDeploymentAvailability(makeAvailableDeployment()) b.Status.SetAddress(fmt.Sprintf("%s-broker.%s.svc.%s", brokerName, testNS, utils.GetClusterDomainName())) - b.Status.MarkIngressSubscriptionReady() + b.Status.PropagateIngressSubscriptionReadiness(makeReadySubscriptionStatus()) return b } @@ -882,7 +862,7 @@ func makeDeletingBroker() *v1alpha1.Broker { } func makeTriggerChannel() *v1alpha1.Channel { - return &v1alpha1.Channel{ + c := &v1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNS, GenerateName: fmt.Sprintf("%s-broker-", brokerName), @@ -897,12 +877,11 @@ func makeTriggerChannel() *v1alpha1.Channel { Spec: v1alpha1.ChannelSpec{ Provisioner: channelProvisioner, }, - Status: v1alpha1.ChannelStatus{ - Address: duckv1alpha1.Addressable{ - Hostname: triggerChannelHostname, - }, - }, } + c.Status.MarkProvisionerInstalled() + c.Status.MarkProvisioned() + c.Status.SetAddress(triggerChannelHostname) + return c } func makeNonAddressableTriggerChannel() *v1alpha1.Channel { @@ -918,7 +897,7 @@ func makeDifferentTriggerChannel() *v1alpha1.Channel { } func makeIngressChannel() *v1alpha1.Channel { - return &v1alpha1.Channel{ + c := &v1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNS, GenerateName: fmt.Sprintf("%s-broker-ingress-", brokerName), @@ -936,12 +915,11 @@ func makeIngressChannel() *v1alpha1.Channel { Spec: v1alpha1.ChannelSpec{ Provisioner: channelProvisioner, }, - Status: v1alpha1.ChannelStatus{ - Address: duckv1alpha1.Addressable{ - Hostname: ingressChannelHostname, - }, - }, } + c.Status.MarkProvisionerInstalled() + c.Status.MarkProvisioned() + c.Status.SetAddress(ingressChannelHostname) + return c } func makeNonAddressableIngressChannel() *v1alpha1.Channel { @@ -1026,7 +1004,7 @@ func makeDifferentIngressService() *corev1.Service { } func makeTestSubscription() *v1alpha1.Subscription { - return &v1alpha1.Subscription{ + s := &v1alpha1.Subscription{ TypeMeta: metav1.TypeMeta{ APIVersion: "eventing.knative.dev/v1alpha1", Kind: "Subscription", @@ -1057,6 +1035,9 @@ func makeTestSubscription() *v1alpha1.Subscription { }, }, } + s.Status.MarkChannelReady() + s.Status.MarkReferencesResolved() + return s } func makeDifferentSubscription() *v1alpha1.Subscription { @@ -1076,3 +1057,30 @@ func getOwnerReference() metav1.OwnerReference { BlockOwnerDeletion: &trueVal, } } + +func makeAvailableDeployment() *v1.Deployment { + d := &v1.Deployment{} + d.Name = "deployment-name" + d.Status.Conditions = []v1.DeploymentCondition{ + { + Type: v1.DeploymentAvailable, + Status: "True", + }, + } + return d +} + +func makeReadyChannelStatus() *v1alpha1.ChannelStatus { + cs := &v1alpha1.ChannelStatus{} + cs.MarkProvisionerInstalled() + cs.MarkProvisioned() + cs.SetAddress("foo") + return cs +} + +func makeReadySubscriptionStatus() *v1alpha1.SubscriptionStatus { + ss := &v1alpha1.SubscriptionStatus{} + ss.MarkChannelReady() + ss.MarkReferencesResolved() + return ss +} From 7c9bfd95c2d9108f737d54a977a6b41eb96ada6c Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Wed, 17 Apr 2019 16:43:28 -0700 Subject: [PATCH 4/4] Increase coverage. --- .../v1alpha1/broker_lifecycle_test.go | 58 +++++++++++++++---- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go index 0636a3ec3db..c5826b94659 100644 --- a/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/broker_lifecycle_test.go @@ -329,39 +329,49 @@ func TestBrokerIsReady(t *testing.T) { t.Run(test.name, func(t *testing.T) { bs := &BrokerStatus{} if test.markIngressReady != nil { + var d *v1.Deployment if *test.markIngressReady { - bs.PropagateIngressDeploymentAvailability(availableDeployment()) + d = availableDeployment() } else { - bs.MarkIngressFailed("failed", "err") + d = unavailableDeployment() } + bs.PropagateIngressDeploymentAvailability(d) } if test.markTriggerChannelReady != nil { + var c *ChannelStatus if *test.markTriggerChannelReady { - bs.PropagateTriggerChannelReadiness(readyChannelStatus()) + c = readyChannelStatus() } else { - bs.MarkTriggerChannelFailed("failed", "err") + c = notReadyChannelStatus() } + bs.PropagateTriggerChannelReadiness(c) } if test.markIngressChannelReady != nil { + var c *ChannelStatus if *test.markIngressChannelReady { - bs.PropagateIngressChannelReadiness(readyChannelStatus()) + c = readyChannelStatus() } else { - bs.MarkIngressChannelFailed("failed", "err") + c = notReadyChannelStatus() } + bs.PropagateIngressChannelReadiness(c) } if test.markIngressSubscriptionReady != nil { + var sub *SubscriptionStatus if *test.markIngressSubscriptionReady { - bs.PropagateIngressSubscriptionReadiness(readySubscriptionStatus()) + sub = readySubscriptionStatus() } else { - bs.MarkIngressSubscriptionFailed("failed", "err") + sub = notReadySubscriptionStatus() } + bs.PropagateIngressSubscriptionReadiness(sub) } if test.markFilterReady != nil { + var d *v1.Deployment if *test.markFilterReady { - bs.PropagateFilterDeploymentAvailability(availableDeployment()) + d = availableDeployment() } else { - bs.MarkFilterFailed("failed", "err") + d = unavailableDeployment() } + bs.PropagateFilterDeploymentAvailability(d) } bs.SetAddress(test.address) @@ -373,9 +383,21 @@ func TestBrokerIsReady(t *testing.T) { } } -func availableDeployment() *v1.Deployment { +func unavailableDeployment() *v1.Deployment { d := &v1.Deployment{} - d.Name = "deployment-name" + d.Name = "unavailable" + d.Status.Conditions = []v1.DeploymentCondition{ + { + Type: v1.DeploymentAvailable, + Status: "False", + }, + } + return d +} + +func availableDeployment() *v1.Deployment { + d := unavailableDeployment() + d.Name = "available" d.Status.Conditions = []v1.DeploymentCondition{ { Type: v1.DeploymentAvailable, @@ -393,9 +415,21 @@ func readyChannelStatus() *ChannelStatus { return cs } +func notReadyChannelStatus() *ChannelStatus { + cs := readyChannelStatus() + cs.MarkNotProvisioned("foo", "bar") + return cs +} + func readySubscriptionStatus() *SubscriptionStatus { ss := &SubscriptionStatus{} ss.MarkChannelReady() ss.MarkReferencesResolved() return ss } + +func notReadySubscriptionStatus() *SubscriptionStatus { + ss := &SubscriptionStatus{} + ss.MarkReferencesResolved() + return ss +}