Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,15 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, t *v1alpha1.T
// If the resource doesn't exist, we'll create it.
if apierrs.IsNotFound(err) {
sub = expected
logging.FromContext(ctx).Info("Creating subscription")
newSub, err := r.EventingClientSet.EventingV1alpha1().Subscriptions(sub.Namespace).Create(sub)
if err != nil {
r.Recorder.Eventf(t, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Trigger's subscription failed: %v", err)
return nil, err
}
return newSub, nil
} else if err != nil {
logging.FromContext(ctx).Error("Failed to get subscription", zap.Error(err))
r.Recorder.Eventf(t, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Trigger's subscription failed: %v", err)
return nil, err
}
Expand All @@ -369,13 +371,15 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, t *v1alpha1.T
if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) {
// Given that spec.channel is immutable, we cannot just update the Subscription. We delete
// it and re-create it instead.
logging.FromContext(ctx).Info("Deleting subscription", zap.String("namespace", sub.Namespace), zap.String("name", sub.Name))
err = r.EventingClientSet.EventingV1alpha1().Subscriptions(sub.Namespace).Delete(sub.Name, &metav1.DeleteOptions{})
if err != nil {
logging.FromContext(ctx).Info("Cannot delete subscription", zap.Error(err))
r.Recorder.Eventf(t, corev1.EventTypeWarning, subscriptionDeleteFailed, "Delete Trigger's subscription failed: %v", err)
return nil, err
}
sub = expected
logging.FromContext(ctx).Info("Creating subscription")
newSub, err := r.EventingClientSet.EventingV1alpha1().Subscriptions(sub.Namespace).Create(sub)
if err != nil {
logging.FromContext(ctx).Info("Cannot create subscription", zap.Error(err))
Expand Down
225 changes: 216 additions & 9 deletions pkg/reconciler/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package trigger

import (
// "context"
// "errors"
"fmt"
"net/url"
"testing"
Expand Down Expand Up @@ -115,6 +113,28 @@ func TestAllCases(t *testing.T) {
reconciletesting.WithTriggerBrokerFailed("DoesNotExist", "Broker does not exist"),
),
}},
}, {
Name: "Broker get failure",
Key: triggerKey,
Objects: []runtime.Object{
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI)),
},
WantErr: true,
WantEvents: []string{
Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconciliation failed: broker.eventing.knative.dev \"test-broker\" not found"),
},
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("get", "brokers"),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
// The first reconciliation will initialize the status conditions.
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerFailed("DoesNotExist", "Broker does not exist"),
),
}},
}, {
Name: "Trigger being deleted",
Key: triggerKey,
Expand Down Expand Up @@ -201,15 +221,124 @@ func TestAllCases(t *testing.T) {
),
}},
}, {
Name: "Subscription Created, not ready",
Name: "Subscription create fail",
Key: triggerKey,
Objects: []runtime.Object{
makeReadyBroker(),
makeTriggerChannel(),
makeIngressChannel(),
makeBrokerFilterService(),
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
),
},
WantErr: true,
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("create", "subscriptions"),
},
WantEvents: []string{
Eventf(corev1.EventTypeWarning, "SubscriptionCreateFailed", "Create Trigger's subscription failed: inducing failure for create subscriptions"),
Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconciliation failed: inducing failure for create subscriptions")},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
// The first reconciliation will initialize the status conditions.
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerReady(),
reconciletesting.WithTriggerNotSubscribed("NotSubscribed", "inducing failure for create subscriptions"),
reconciletesting.WithTriggerStatusSubscriberURI(subscriberURI),
),
}},
WantCreates: []metav1.Object{
makeIngressSubscription(),
},
}, {
Name: "Subscription delete fail",
Key: triggerKey,
Objects: []runtime.Object{
makeReadyBroker(),
makeTriggerChannel(),
makeIngressChannel(),
makeBrokerFilterService(),
makeDifferentReadySubscription(),
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
),
},
WantErr: true,
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("delete", "subscriptions"),
},
WantEvents: []string{
Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Delete Trigger's subscription failed: inducing failure for delete subscriptions"),
Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconciliation failed: inducing failure for delete subscriptions")},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
// The first reconciliation will initialize the status conditions.
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerReady(),
reconciletesting.WithTriggerNotSubscribed("NotSubscribed", "inducing failure for delete subscriptions"),
reconciletesting.WithTriggerStatusSubscriberURI(subscriberURI),
),
}},
// Name being "" is NOT a bug. Because we use generate name, the object created
// does not have a name...
WantDeletes: []clientgotesting.DeleteActionImpl{{
Name: "",
}},
}, {
Name: "Subscription create after delete fail",
Key: triggerKey,
Objects: []runtime.Object{
makeReadyBroker(),
makeTriggerChannel(),
makeIngressChannel(),
makeBrokerFilterService(),
makeDifferentReadySubscription(),
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
),
},
WantErr: true,
WithReactors: []clientgotesting.ReactionFunc{
InduceFailure("create", "subscriptions"),
},
WantEvents: []string{
Eventf(corev1.EventTypeWarning, "SubscriptionCreateFailed", "Create Trigger's subscription failed: inducing failure for create subscriptions"),
Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconciliation failed: inducing failure for create subscriptions")},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
// The first reconciliation will initialize the status conditions.
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerReady(),
reconciletesting.WithTriggerNotSubscribed("NotSubscribed", "inducing failure for create subscriptions"),
reconciletesting.WithTriggerStatusSubscriberURI(subscriberURI),
),
}},
// Name being "" is NOT a bug. Because we use generate name, the object created
// does not have a name...
WantDeletes: []clientgotesting.DeleteActionImpl{{
Name: "",
}},
WantCreates: []metav1.Object{
makeIngressSubscription(),
},
}, {
Name: "Subscription updated works",
Key: triggerKey,
Objects: []runtime.Object{
makeReadyBroker(),
makeTriggerChannel(),
makeIngressChannel(),
makeBrokerFilterService(),
makeDifferentReadySubscription(),
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(makeServiceURI().String()),
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
),
},
Expand All @@ -219,28 +348,91 @@ func TestAllCases(t *testing.T) {
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(makeServiceURI().String()),
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
// The first reconciliation will initialize the status conditions.
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerReady(),
reconciletesting.WithTriggerNotSubscribed("SubscriptionNotReady", "Subscription is not ready: nil"),
reconciletesting.WithTriggerStatusSubscriberURI(makeServiceURI().String()),
reconciletesting.WithTriggerStatusSubscriberURI(subscriberURI),
),
}},
// Name being "" is NOT a bug. Because we use generate name, the object created
// does not have a name...
WantDeletes: []clientgotesting.DeleteActionImpl{{
Name: "",
}},
WantCreates: []metav1.Object{
makeIngressSubscription(),
},
}, {
Name: "Subscription Created, not ready",
Key: triggerKey,
Objects: []runtime.Object{
makeReadyBroker(),
makeTriggerChannel(),
makeIngressChannel(),
makeBrokerFilterService(),
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
),
},
WantErr: false,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "TriggerReconciled", "Trigger reconciled"),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
// The first reconciliation will initialize the status conditions.
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerReady(),
reconciletesting.WithTriggerNotSubscribed("SubscriptionNotReady", "Subscription is not ready: nil"),
reconciletesting.WithTriggerStatusSubscriberURI(subscriberURI),
),
}},
WantCreates: []metav1.Object{
makeIngressSubscription(),
},
}, {
Name: "Subscription not ready, trigger marked not ready",
Key: triggerKey,
Objects: []runtime.Object{
makeReadyBroker(),
makeTriggerChannel(),
makeIngressChannel(),
makeBrokerFilterService(),
makeNotReadySubscription(),
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
),
},
WantErr: false,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "TriggerReconciled", "Trigger reconciled"),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
// The first reconciliation will initialize the status conditions.
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerReady(),
reconciletesting.WithTriggerNotSubscribed("SubscriptionNotReady", "Subscription is not ready: nil"),
reconciletesting.WithTriggerStatusSubscriberURI(subscriberURI),
),
}},
}, {
Name: "Subscription ready, trigger marked ready",
Key: triggerKey,
Objects: []runtime.Object{
makeReadyBroker(),
makeTriggerChannel(),
makeIngressChannel(),
makeBrokerFilterService(),
makeReadySubscription(),
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(makeServiceURI().String()),
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
),
},
Expand All @@ -250,12 +442,12 @@ func TestAllCases(t *testing.T) {
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, brokerName,
reconciletesting.WithTriggerSubscriberURI(makeServiceURI().String()),
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
// The first reconciliation will initialize the status conditions.
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerReady(),
reconciletesting.WithTriggerSubscribed(),
reconciletesting.WithTriggerStatusSubscriberURI(makeServiceURI().String()),
reconciletesting.WithTriggerStatusSubscriberURI(subscriberURI),
),
}},
},
Expand Down Expand Up @@ -424,12 +616,27 @@ func makeIngressSubscription() *v1alpha1.Subscription {
return resources.NewSubscription(makeTrigger(), makeTriggerChannel(), makeIngressChannel(), makeServiceURI())
}

// Just so we can test subscription updates
func makeDifferentReadySubscription() *v1alpha1.Subscription {
uri := "http://example.com/differenturi"
s := makeIngressSubscription()
s.Spec.Subscriber.URI = &uri
s.Status = *v1alpha1.TestHelper.ReadySubscriptionStatus()
return s
}

func makeReadySubscription() *v1alpha1.Subscription {
s := makeIngressSubscription()
s.Status = *v1alpha1.TestHelper.ReadySubscriptionStatus()
return s
}

func makeNotReadySubscription() *v1alpha1.Subscription {
s := makeIngressSubscription()
s.Status = *v1alpha1.TestHelper.NotReadySubscriptionStatus()
return s
}

func getOwnerReference() metav1.OwnerReference {
return metav1.OwnerReference{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Expand Down