diff --git a/pkg/apis/eventing/v1alpha1/trigger_lifecycle.go b/pkg/apis/eventing/v1alpha1/trigger_lifecycle.go index dee7a8cf8c9..07c20ffb99c 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_lifecycle.go +++ b/pkg/apis/eventing/v1alpha1/trigger_lifecycle.go @@ -18,17 +18,13 @@ package v1alpha1 import duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" -var triggerCondSet = duckv1alpha1.NewLivingConditionSet(TriggerConditionBrokerExists, TriggerConditionKubernetesService, TriggerConditionVirtualService, TriggerConditionSubscribed) +var triggerCondSet = duckv1alpha1.NewLivingConditionSet(TriggerConditionBrokerExists, TriggerConditionSubscribed) const ( // TriggerConditionReady has status True when all subconditions below have been set to True. TriggerConditionReady = duckv1alpha1.ConditionReady TriggerConditionBrokerExists duckv1alpha1.ConditionType = "BrokerExists" - TriggerConditionKubernetesService duckv1alpha1.ConditionType = "KubernetesServiceReady" - - TriggerConditionVirtualService duckv1alpha1.ConditionType = "VirtualServiceReady" - TriggerConditionSubscribed duckv1alpha1.ConditionType = "Subscribed" // TriggerAnyFilter Constant to represent that we should allow anything. @@ -58,14 +54,6 @@ func (ts *TriggerStatus) MarkBrokerDoesNotExist() { triggerCondSet.Manage(ts).MarkFalse(TriggerConditionBrokerExists, "doesNotExist", "Broker does not exist") } -func (ts *TriggerStatus) MarkKubernetesServiceExists() { - triggerCondSet.Manage(ts).MarkTrue(TriggerConditionKubernetesService) -} - -func (ts *TriggerStatus) MarkVirtualServiceExists() { - triggerCondSet.Manage(ts).MarkTrue(TriggerConditionVirtualService) -} - func (ts *TriggerStatus) MarkSubscribed() { triggerCondSet.Manage(ts).MarkTrue(TriggerConditionSubscribed) } diff --git a/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go index 0b37671b77c..86759afbd12 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go @@ -35,16 +35,6 @@ var ( Status: corev1.ConditionTrue, } - triggerConditionKubernetesService = duckv1alpha1.Condition{ - Type: TriggerConditionKubernetesService, - Status: corev1.ConditionTrue, - } - - triggerConditionVirtualService = duckv1alpha1.Condition{ - Type: TriggerConditionVirtualService, - Status: corev1.ConditionTrue, - } - triggerConditionSubscribed = duckv1alpha1.Condition{ Type: TriggerConditionSubscribed, Status: corev1.ConditionFalse, @@ -74,19 +64,18 @@ func TestTriggerGetCondition(t *testing.T) { Status: duckv1alpha1.Status{ Conditions: []duckv1alpha1.Condition{ triggerConditionBrokerExists, - triggerConditionKubernetesService, + triggerConditionSubscribed, }, }, }, - condQuery: TriggerConditionKubernetesService, - want: &triggerConditionKubernetesService, + condQuery: TriggerConditionSubscribed, + want: &triggerConditionSubscribed, }, { name: "multiple conditions, condition false", ts: &TriggerStatus{ Status: duckv1alpha1.Status{ Conditions: []duckv1alpha1.Condition{ triggerConditionBrokerExists, - triggerConditionKubernetesService, triggerConditionSubscribed, }, }, @@ -98,7 +87,6 @@ func TestTriggerGetCondition(t *testing.T) { ts: &TriggerStatus{ Status: duckv1alpha1.Status{ Conditions: []duckv1alpha1.Condition{ - triggerConditionVirtualService, triggerConditionSubscribed, }, }, @@ -130,18 +118,12 @@ func TestTriggerInitializeConditions(t *testing.T) { Conditions: []duckv1alpha1.Condition{{ Type: TriggerConditionBrokerExists, Status: corev1.ConditionUnknown, - }, { - Type: TriggerConditionKubernetesService, - Status: corev1.ConditionUnknown, }, { Type: TriggerConditionReady, Status: corev1.ConditionUnknown, }, { Type: TriggerConditionSubscribed, Status: corev1.ConditionUnknown, - }, { - Type: TriggerConditionVirtualService, - Status: corev1.ConditionUnknown, }}, }, }, @@ -150,7 +132,7 @@ func TestTriggerInitializeConditions(t *testing.T) { ts: &TriggerStatus{ Status: duckv1alpha1.Status{ Conditions: []duckv1alpha1.Condition{{ - Type: TriggerConditionVirtualService, + Type: TriggerConditionBrokerExists, Status: corev1.ConditionFalse, }}, }, @@ -159,19 +141,13 @@ func TestTriggerInitializeConditions(t *testing.T) { Status: duckv1alpha1.Status{ Conditions: []duckv1alpha1.Condition{{ Type: TriggerConditionBrokerExists, - Status: corev1.ConditionUnknown, - }, { - Type: TriggerConditionKubernetesService, - Status: corev1.ConditionUnknown, + Status: corev1.ConditionFalse, }, { Type: TriggerConditionReady, Status: corev1.ConditionUnknown, }, { Type: TriggerConditionSubscribed, Status: corev1.ConditionUnknown, - }, { - Type: TriggerConditionVirtualService, - Status: corev1.ConditionFalse, }}, }, }, @@ -190,18 +166,12 @@ func TestTriggerInitializeConditions(t *testing.T) { Conditions: []duckv1alpha1.Condition{{ Type: TriggerConditionBrokerExists, Status: corev1.ConditionUnknown, - }, { - Type: TriggerConditionKubernetesService, - Status: corev1.ConditionUnknown, }, { Type: TriggerConditionReady, Status: corev1.ConditionUnknown, }, { Type: TriggerConditionSubscribed, Status: corev1.ConditionTrue, - }, { - Type: TriggerConditionVirtualService, - Status: corev1.ConditionUnknown, }}, }, }, @@ -239,20 +209,6 @@ func TestTriggerIsReady(t *testing.T) { markVirtualServiceExists: true, markSubscribed: true, wantReady: false, - }, { - name: "k8s service sad", - markBrokerExists: true, - markKubernetesServiceExists: false, - markVirtualServiceExists: true, - markSubscribed: true, - wantReady: false, - }, { - name: "virtual service sad", - markBrokerExists: true, - markKubernetesServiceExists: true, - markVirtualServiceExists: false, - markSubscribed: true, - wantReady: false, }, { name: "subscribed sad", markBrokerExists: true, @@ -274,12 +230,6 @@ func TestTriggerIsReady(t *testing.T) { if test.markBrokerExists { ts.MarkBrokerExists() } - if test.markKubernetesServiceExists { - ts.MarkKubernetesServiceExists() - } - if test.markVirtualServiceExists { - ts.MarkVirtualServiceExists() - } if test.markSubscribed { ts.MarkSubscribed() } diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index 632cc024b56..b9797a78277 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -27,7 +27,7 @@ import ( ceclient "github.com/cloudevents/sdk-go/pkg/cloudevents/client" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger/path" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -120,15 +120,10 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp } // tctx.URI is actually the path... - if tctx.URI != "/" { - resp.Status = http.StatusNotFound - return nil - } - - triggerRef, err := provisioners.ParseChannel(tctx.Host) + triggerRef, err := path.Parse(tctx.URI) if err != nil { - r.logger.Error("Unable to parse host as a trigger", zap.Error(err), zap.String("host", tctx.Host)) - return errors.New("unable to parse host as a Trigger") + r.logger.Info("Unable to parse path as a trigger", zap.Error(err), zap.String("path", tctx.URI)) + return errors.New("unable to parse path as a Trigger") } // Remove the TTL attribute that is used by the Broker. @@ -170,7 +165,7 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp } // sendEvent sends an event to a subscriber if the trigger filter passes. -func (r *Receiver) sendEvent(ctx context.Context, tctx cehttp.TransportContext, trigger provisioners.ChannelReference, event *cloudevents.Event) (*cloudevents.Event, error) { +func (r *Receiver) sendEvent(ctx context.Context, tctx cehttp.TransportContext, trigger types.NamespacedName, event *cloudevents.Event) (*cloudevents.Event, error) { t, err := r.getTrigger(ctx, trigger) if err != nil { r.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", trigger)) @@ -199,14 +194,9 @@ func (r *Receiver) sendEvent(ctx context.Context, tctx cehttp.TransportContext, return r.ceClient.Send(sendingCTX, *event) } -func (r *Receiver) getTrigger(ctx context.Context, ref provisioners.ChannelReference) (*eventingv1alpha1.Trigger, error) { +func (r *Receiver) getTrigger(ctx context.Context, ref types.NamespacedName) (*eventingv1alpha1.Trigger, error) { t := &eventingv1alpha1.Trigger{} - err := r.client.Get(ctx, - types.NamespacedName{ - Namespace: ref.Namespace, - Name: ref.Name, - }, - t) + err := r.client.Get(ctx, ref, t) return t, err } diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index 12db0e7529a..3163db64701 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -51,7 +51,8 @@ const ( ) var ( - host = fmt.Sprintf("%s.%s.triggers.%s", triggerName, testNS, utils.GetClusterDomainName()) + host = fmt.Sprintf("%s.%s.triggers.%s", triggerName, testNS, utils.GetClusterDomainName()) + validPath = fmt.Sprintf("/triggers/%s/%s", testNS, triggerName) ) func init() { @@ -87,23 +88,39 @@ func TestReceiver(t *testing.T) { tctx: &cehttp.TransportContext{ Method: "GET", Host: host, - URI: "/", + URI: validPath, }, expectedStatus: http.StatusMethodNotAllowed, }, - "Other path": { + "Path too short": { tctx: &cehttp.TransportContext{ Method: "POST", Host: host, - URI: "/someotherEndpoint", + URI: "/test-namespace/test-trigger", }, - expectedStatus: http.StatusNotFound, + expectedErr: true, + }, + "Path too long": { + tctx: &cehttp.TransportContext{ + Method: "POST", + Host: host, + URI: "/triggers/test-namespace/test-trigger/extra", + }, + expectedErr: true, + }, + "Path without prefix": { + tctx: &cehttp.TransportContext{ + Method: "POST", + Host: host, + URI: "/something/test-namespace/test-trigger", + }, + expectedErr: true, }, "Bad host": { tctx: &cehttp.TransportContext{ Method: "POST", Host: "badhost-cant-be-parsed-as-a-trigger-name-plus-namespace", - URI: "/", + URI: validPath, }, expectedErr: true, }, @@ -178,7 +195,7 @@ func TestReceiver(t *testing.T) { tctx: &cehttp.TransportContext{ Method: "POST", Host: host, - URI: "/", + URI: validPath, Header: http.Header{ // foo won't pass filtering. "foo": []string{"bar"}, @@ -248,7 +265,7 @@ func TestReceiver(t *testing.T) { tctx = &cehttp.TransportContext{ Method: http.MethodPost, Host: host, - URI: "/", + URI: validPath, } } ctx := cehttp.WithTransportContext(context.Background(), *tctx) diff --git a/pkg/reconciler/v1alpha1/broker/resources/filter.go b/pkg/reconciler/v1alpha1/broker/resources/filter.go index 62177330b5d..73903b986eb 100644 --- a/pkg/reconciler/v1alpha1/broker/resources/filter.go +++ b/pkg/reconciler/v1alpha1/broker/resources/filter.go @@ -48,15 +48,15 @@ func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment { Kind: "Broker", }), }, - Labels: filterLabels(args.Broker), + Labels: FilterLabels(args.Broker), }, Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ - MatchLabels: filterLabels(args.Broker), + MatchLabels: FilterLabels(args.Broker), }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: filterLabels(args.Broker), + Labels: FilterLabels(args.Broker), Annotations: map[string]string{ "sidecar.istio.io/inject": "true", }, @@ -91,7 +91,7 @@ func MakeFilterService(b *eventingv1alpha1.Broker) *corev1.Service { ObjectMeta: metav1.ObjectMeta{ Namespace: b.Namespace, Name: fmt.Sprintf("%s-broker-filter", b.Name), - Labels: filterLabels(b), + Labels: FilterLabels(b), OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(b, schema.GroupVersionKind{ Group: eventingv1alpha1.SchemeGroupVersion.Group, @@ -101,7 +101,7 @@ func MakeFilterService(b *eventingv1alpha1.Broker) *corev1.Service { }, }, Spec: corev1.ServiceSpec{ - Selector: filterLabels(b), + Selector: FilterLabels(b), Ports: []corev1.ServicePort{ { Name: "http", @@ -113,7 +113,9 @@ func MakeFilterService(b *eventingv1alpha1.Broker) *corev1.Service { } } -func filterLabels(b *eventingv1alpha1.Broker) map[string]string { +// FilterLabels generates the labels present on all resources representing the filter of the given +// Broker. +func FilterLabels(b *eventingv1alpha1.Broker) map[string]string { return map[string]string{ "eventing.knative.dev/broker": b.Name, "eventing.knative.dev/brokerRole": "filter", diff --git a/pkg/reconciler/v1alpha1/subscription/subscription.go b/pkg/reconciler/v1alpha1/subscription/subscription.go index 5b67d66ccfb..12e1cb1a821 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription.go @@ -102,7 +102,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err err := r.client.Get(ctx, request.NamespacedName, subscription) if errors.IsNotFound(err) { - logging.FromContext(ctx).Error("Could not find Subscription") + logging.FromContext(ctx).Info("Could not find Subscription") return reconcile.Result{}, nil } diff --git a/pkg/reconciler/v1alpha1/trigger/path/path.go b/pkg/reconciler/v1alpha1/trigger/path/path.go new file mode 100644 index 00000000000..83a1dce4e4d --- /dev/null +++ b/pkg/reconciler/v1alpha1/trigger/path/path.go @@ -0,0 +1,53 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package path + +import ( + "fmt" + "strings" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "k8s.io/apimachinery/pkg/types" +) + +const ( + prefix = "triggers" +) + +// Generate generates the Path portion of a URI to send events to the given Trigger. +func Generate(t *v1alpha1.Trigger) string { + return fmt.Sprintf("/%s/%s/%s", prefix, t.Namespace, t.Name) +} + +// Parse parses the Path portion of a URI to determine which Trigger the request corresponds to. It +// is expected to be in the form "/triggers/namespace/name". +func Parse(path string) (types.NamespacedName, error) { + parts := strings.Split(path, "/") + if len(parts) != 4 { + return types.NamespacedName{}, fmt.Errorf("incorrect number of parts in the path, expected 4, actual %d, '%s'", len(parts), path) + } + if parts[0] != "" { + return types.NamespacedName{}, fmt.Errorf("text before the first slash, actual '%s'", path) + } + if parts[1] != prefix { + return types.NamespacedName{}, fmt.Errorf("incorrect prefix, expected '%s', actual '%s'", prefix, path) + } + return types.NamespacedName{ + Namespace: parts[2], + Name: parts[3], + }, nil +} diff --git a/pkg/reconciler/v1alpha1/trigger/resources/labels.go b/pkg/reconciler/v1alpha1/trigger/resources/labels.go deleted file mode 100644 index 1e60cdb91ea..00000000000 --- a/pkg/reconciler/v1alpha1/trigger/resources/labels.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - -func ServiceLabels(t *eventingv1alpha1.Trigger) map[string]string { - return map[string]string{ - "eventing.knative.dev/trigger": t.Name, - } -} - -func SubscriptionLabels(t *eventingv1alpha1.Trigger) map[string]string { - return map[string]string{ - "eventing.knative.dev/broker": t.Spec.Broker, - "eventing.knative.dev/trigger": t.Name, - } -} - -func VirtualServiceLabels(t *eventingv1alpha1.Trigger) map[string]string { - return map[string]string{ - "eventing.knative.dev/trigger": t.Name, - } -} diff --git a/pkg/reconciler/v1alpha1/trigger/resources/service.go b/pkg/reconciler/v1alpha1/trigger/resources/service.go deleted file mode 100644 index 3cd686926cc..00000000000 --- a/pkg/reconciler/v1alpha1/trigger/resources/service.go +++ /dev/null @@ -1,52 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "fmt" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -// NewService returns a K8s placeholder service for trigger 't'. -func NewService(t *eventingv1alpha1.Trigger) *corev1.Service { - return &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: t.Namespace, - GenerateName: fmt.Sprintf("trigger-%s-", t.Name), - Labels: ServiceLabels(t), - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(t, schema.GroupVersionKind{ - Group: eventingv1alpha1.SchemeGroupVersion.Group, - Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "Trigger", - }), - }, - }, - Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - { - Name: "http", - Port: 80, - }, - }, - }, - } -} diff --git a/pkg/reconciler/v1alpha1/trigger/resources/subscription.go b/pkg/reconciler/v1alpha1/trigger/resources/subscription.go index 1cfb3dd7f27..dc53d8b42a3 100644 --- a/pkg/reconciler/v1alpha1/trigger/resources/subscription.go +++ b/pkg/reconciler/v1alpha1/trigger/resources/subscription.go @@ -18,6 +18,7 @@ package resources import ( "fmt" + "net/url" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" corev1 "k8s.io/api/core/v1" @@ -25,9 +26,10 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ) -// NewSubscription returns a placeholder subscription for trigger 't', from brokerTrigger to 'svc' +// NewSubscription returns a placeholder subscription for trigger 't', from brokerTrigger to 'uri' // replying to brokerIngress. -func NewSubscription(t *eventingv1alpha1.Trigger, brokerTrigger, brokerIngress *eventingv1alpha1.Channel, svc *corev1.Service) *eventingv1alpha1.Subscription { +func NewSubscription(t *eventingv1alpha1.Trigger, brokerTrigger, brokerIngress *eventingv1alpha1.Channel, uri *url.URL) *eventingv1alpha1.Subscription { + uriString := uri.String() return &eventingv1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ Namespace: t.Namespace, @@ -48,11 +50,7 @@ func NewSubscription(t *eventingv1alpha1.Trigger, brokerTrigger, brokerIngress * Name: brokerTrigger.Name, }, Subscriber: &eventingv1alpha1.SubscriberSpec{ - Ref: &corev1.ObjectReference{ - APIVersion: "v1", - Kind: "Service", - Name: svc.Name, - }, + URI: &uriString, }, Reply: &eventingv1alpha1.ReplyStrategy{ Channel: &corev1.ObjectReference{ @@ -64,3 +62,12 @@ func NewSubscription(t *eventingv1alpha1.Trigger, brokerTrigger, brokerIngress * }, } } + +// SubscriptionLabels generates the labels present on the Subscription linking this Trigger to the +// Broker's Channels. +func SubscriptionLabels(t *eventingv1alpha1.Trigger) map[string]string { + return map[string]string{ + "eventing.knative.dev/broker": t.Spec.Broker, + "eventing.knative.dev/trigger": t.Name, + } +} diff --git a/pkg/reconciler/v1alpha1/trigger/resources/virtual_service.go b/pkg/reconciler/v1alpha1/trigger/resources/virtual_service.go deleted file mode 100644 index 45e4b80272a..00000000000 --- a/pkg/reconciler/v1alpha1/trigger/resources/virtual_service.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "fmt" - - "github.com/knative/eventing/pkg/reconciler/names" - "github.com/knative/eventing/pkg/utils" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -// NewVirtualService returns a placeholder virtual service object for trigger 't' and service 'svc'. -func NewVirtualService(t *eventingv1alpha1.Trigger, svc *corev1.Service) *istiov1alpha3.VirtualService { - destinationHost := fmt.Sprintf("%s-broker-filter.%s.svc.%s", t.Spec.Broker, t.Namespace, utils.GetClusterDomainName()) - return &istiov1alpha3.VirtualService{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: fmt.Sprintf("%s-", t.Name), - Namespace: t.Namespace, - Labels: VirtualServiceLabels(t), - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(t, schema.GroupVersionKind{ - Group: eventingv1alpha1.SchemeGroupVersion.Group, - Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "Trigger", - }), - }, - }, - Spec: istiov1alpha3.VirtualServiceSpec{ - Hosts: []string{ - names.ServiceHostName(svc.Name, svc.Namespace), - }, - HTTP: []istiov1alpha3.HTTPRoute{{ - Rewrite: &istiov1alpha3.HTTPRewrite{ - Authority: fmt.Sprintf("%s.%s.triggers.%s", t.Name, t.Namespace, utils.GetClusterDomainName()), - }, - Route: []istiov1alpha3.DestinationWeight{{ - Destination: istiov1alpha3.Destination{ - Host: destinationHost, - Port: istiov1alpha3.PortSelector{ - Number: 80, - }, - }}, - }}, - }, - }, - } -} diff --git a/pkg/reconciler/v1alpha1/trigger/trigger.go b/pkg/reconciler/v1alpha1/trigger/trigger.go index b1dc6c459e6..ff60d792c16 100644 --- a/pkg/reconciler/v1alpha1/trigger/trigger.go +++ b/pkg/reconciler/v1alpha1/trigger/trigger.go @@ -18,12 +18,15 @@ package trigger import ( "context" - - "github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger/resources" + "net/url" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" + "github.com/knative/eventing/pkg/reconciler/names" "github.com/knative/eventing/pkg/reconciler/v1alpha1/broker" + brokerresources "github.com/knative/eventing/pkg/reconciler/v1alpha1/broker/resources" + "github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger/path" + "github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger/resources" "github.com/knative/eventing/pkg/utils/resolve" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" @@ -210,10 +213,13 @@ func (r *reconciler) reconcile(ctx context.Context, t *v1alpha1.Trigger) error { t.Status.InitializeConditions() // 1. Verify the Broker exists. - // 2. Find the Subscriber's URI. - // 2. Creates a K8s Service uniquely named for this Trigger. - // 3. Creates a VirtualService that routes the K8s Service to the Broker's filter service on an identifiable host name. - // 4. Creates a Subscription from the Broker's single Channel to this Trigger's K8s Service, with reply set to the Broker. + // 2. Get the Broker's: + // - Filter Channel + // - Ingress Channel + // - Filter Service + // 3. Find the Subscriber's URI. + // 4. Creates a Subscription from the Broker's Filter Channel to this Trigger via the Broker's + // Filter Service with a specific path, and reply set to the Broker's Ingress Channel. if t.DeletionTimestamp != nil { // Everything is cleaned up by the garbage collector. @@ -238,29 +244,21 @@ func (r *reconciler) reconcile(ctx context.Context, t *v1alpha1.Trigger) error { logging.FromContext(ctx).Error("Unable to get the Broker's Ingress Channel", zap.Error(err)) return err } - - subscriberURI, err := resolve.SubscriberSpec(ctx, r.dynamicClient, t.Namespace, t.Spec.Subscriber) + // Get Broker filter service. + filterSvc, err := r.getBrokerFilterService(ctx, b) if err != nil { - logging.FromContext(ctx).Error("Unable to get the Subscriber's URI", zap.Error(err)) + logging.FromContext(ctx).Error("Unable to get the Broker's filter Service", zap.Error(err)) return err } - t.Status.SubscriberURI = subscriberURI - svc, err := r.reconcileK8sService(ctx, t) - if err != nil { - logging.FromContext(ctx).Error("Unable to reconcile the K8s Service", zap.Error(err)) - return err - } - t.Status.MarkKubernetesServiceExists() - - _, err = r.reconcileVirtualService(ctx, t, svc) + subscriberURI, err := resolve.SubscriberSpec(ctx, r.dynamicClient, t.Namespace, t.Spec.Subscriber) if err != nil { - logging.FromContext(ctx).Error("Unable to reconcile the VirtualService", zap.Error(err)) + logging.FromContext(ctx).Error("Unable to get the Subscriber's URI", zap.Error(err)) return err } - t.Status.MarkVirtualServiceExists() + t.Status.SubscriberURI = subscriberURI - _, err = r.subscribeToBrokerChannel(ctx, t, brokerTrigger, brokerIngress, svc) + _, err = r.subscribeToBrokerChannel(ctx, t, brokerTrigger, brokerIngress, filterSvc) if err != nil { logging.FromContext(ctx).Error("Unable to Subscribe", zap.Error(err)) t.Status.MarkNotSubscribed("notSubscribed", "%v", err) @@ -360,11 +358,11 @@ func (r *reconciler) getChannel(ctx context.Context, b *v1alpha1.Broker, ls labe // getService returns the K8s service for trigger 't' if exists, // otherwise it returns an error. -func (r *reconciler) getService(ctx context.Context, t *v1alpha1.Trigger) (*corev1.Service, error) { +func (r *reconciler) getBrokerFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) { list := &corev1.ServiceList{} opts := &runtimeclient.ListOptions{ - Namespace: t.Namespace, - LabelSelector: labels.SelectorFromSet(resources.ServiceLabels(t)), + Namespace: b.Namespace, + LabelSelector: labels.SelectorFromSet(brokerresources.FilterLabels(b)), // Set Raw because if we need to get more than one page, then we will put the continue token // into opts.Raw.Continue. Raw: &metav1.ListOptions{}, @@ -375,7 +373,7 @@ func (r *reconciler) getService(ctx context.Context, t *v1alpha1.Trigger) (*core return nil, err } for _, svc := range list.Items { - if metav1.IsControlledBy(&svc, t) { + if metav1.IsControlledBy(&svc, b) { return &svc, nil } } @@ -383,91 +381,14 @@ func (r *reconciler) getService(ctx context.Context, t *v1alpha1.Trigger) (*core return nil, k8serrors.NewNotFound(schema.GroupResource{}, "") } -// reconcileK8sService reconciles the K8s service for trigger 't'. -func (r *reconciler) reconcileK8sService(ctx context.Context, t *v1alpha1.Trigger) (*corev1.Service, error) { - current, err := r.getService(ctx, t) - - // If the resource doesn't exist, we'll create it - if k8serrors.IsNotFound(err) { - svc := resources.NewService(t) - err = r.client.Create(ctx, svc) - if err != nil { - return nil, err - } - return svc, nil - } else if err != nil { - return nil, err - } - - expected := resources.NewService(t) - // spec.clusterIP is immutable and is set on existing services. If we don't set this to the same value, we will - // encounter an error while updating. - expected.Spec.ClusterIP = current.Spec.ClusterIP - if !equality.Semantic.DeepDerivative(expected.Spec, current.Spec) { - current.Spec = expected.Spec - err = r.client.Update(ctx, current) - if err != nil { - return nil, err - } - } - return current, nil -} - -// getVirtualService returns the virtual service for trigger 't' if exists, -// otherwise it returns an error. -func (r *reconciler) getVirtualService(ctx context.Context, t *v1alpha1.Trigger) (*istiov1alpha3.VirtualService, error) { - list := &istiov1alpha3.VirtualServiceList{} - opts := &runtimeclient.ListOptions{ - Namespace: t.Namespace, - LabelSelector: labels.SelectorFromSet(resources.VirtualServiceLabels(t)), - // Set Raw because if we need to get more than one page, then we will put the continue token - // into opts.Raw.Continue. - Raw: &metav1.ListOptions{}, - } - - err := r.client.List(ctx, opts, list) - if err != nil { - return nil, err - } - for _, vs := range list.Items { - if metav1.IsControlledBy(&vs, t) { - return &vs, nil - } - } - - return nil, k8serrors.NewNotFound(schema.GroupResource{}, "") -} - -// reconcileVirtualService reconciles the virtual service for trigger 't' and service 'svc'. -func (r *reconciler) reconcileVirtualService(ctx context.Context, t *v1alpha1.Trigger, svc *corev1.Service) (*istiov1alpha3.VirtualService, error) { - virtualService, err := r.getVirtualService(ctx, t) - - // If the resource doesn't exist, we'll create it - if k8serrors.IsNotFound(err) { - virtualService = resources.NewVirtualService(t, svc) - err = r.client.Create(ctx, virtualService) - if err != nil { - return nil, err - } - return virtualService, nil - } else if err != nil { - return nil, err - } - - expected := resources.NewVirtualService(t, svc) - if !equality.Semantic.DeepDerivative(expected.Spec, virtualService.Spec) { - virtualService.Spec = expected.Spec - err = r.client.Update(ctx, virtualService) - if err != nil { - return nil, err - } - } - return virtualService, nil -} - // subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels. func (r *reconciler) subscribeToBrokerChannel(ctx context.Context, t *v1alpha1.Trigger, brokerTrigger, brokerIngress *v1alpha1.Channel, svc *corev1.Service) (*v1alpha1.Subscription, error) { - expected := resources.NewSubscription(t, brokerTrigger, brokerIngress, svc) + uri := &url.URL{ + Scheme: "http", + Host: names.ServiceHostName(svc.Name, svc.Namespace), + Path: path.Generate(t), + } + expected := resources.NewSubscription(t, brokerTrigger, brokerIngress, uri) sub, err := r.getSubscription(ctx, t) // If the resource doesn't exist, we'll create it diff --git a/pkg/reconciler/v1alpha1/trigger/trigger_test.go b/pkg/reconciler/v1alpha1/trigger/trigger_test.go index 00a3ea3f784..e0e28920b68 100644 --- a/pkg/reconciler/v1alpha1/trigger/trigger_test.go +++ b/pkg/reconciler/v1alpha1/trigger/trigger_test.go @@ -20,14 +20,15 @@ import ( "context" "errors" "fmt" + "net/url" "testing" - "github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger/resources" - + "github.com/google/go-cmp/cmp" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/reconciler/names" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" "github.com/knative/eventing/pkg/reconciler/v1alpha1/broker" + brokerresources "github.com/knative/eventing/pkg/reconciler/v1alpha1/broker/resources" + "github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger/resources" "github.com/knative/eventing/pkg/utils" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" @@ -37,11 +38,14 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( @@ -52,6 +56,8 @@ const ( subscriberAPIVersion = "v1" subscriberKind = "Service" subscriberName = "subscriberName" + + continueToken = "continueToken" ) var ( @@ -143,8 +149,7 @@ func TestReconcile(t *testing.T) { Name: "Trigger not found", }, { - Name: "Get Trigger error", - Scheme: scheme.Scheme, + Name: "Get Trigger error", Mocks: controllertesting.Mocks{ MockGets: []controllertesting.MockGet{ func(_ client.Client, _ context.Context, _ client.ObjectKey, obj runtime.Object) (controllertesting.MockHandled, error) { @@ -158,16 +163,14 @@ func TestReconcile(t *testing.T) { WantErrMsg: "test error getting the Trigger", }, { - Name: "Trigger being deleted", - Scheme: scheme.Scheme, + Name: "Trigger being deleted", InitialState: []runtime.Object{ makeDeletingTrigger(), }, WantEvent: []corev1.Event{events[triggerReconciled]}, }, { - Name: "Get Broker error", - Scheme: scheme.Scheme, + Name: "Get Broker error", InitialState: []runtime.Object{ makeTrigger(), }, @@ -185,8 +188,7 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Get Broker Trigger channel error", - Scheme: scheme.Scheme, + Name: "Get Broker Trigger channel error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), @@ -209,8 +211,16 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Get Broker Ingress channel error", - Scheme: scheme.Scheme, + Name: "Broker Trigger channel not found", + InitialState: []runtime.Object{ + makeTrigger(), + makeBroker(), + }, + WantErrMsg: ` "" not found`, + WantEvent: []corev1.Event{events[triggerReconcileFailed]}, + }, + { + Name: "Get Broker Ingress channel error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), @@ -234,136 +244,98 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Resolve subscriberURI error", - Scheme: scheme.Scheme, + Name: "Broker Ingress channel not found", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), }, - DynamicMocks: controllertesting.DynamicMocks{ - MockGets: []controllertesting.MockDynamicGet{ - func(ctx *controllertesting.MockDynamicContext, name string, options metav1.GetOptions, subresources ...string) (handled controllertesting.MockHandled, i *unstructured.Unstructured, e error) { - if ctx.Resource.Group == "" && ctx.Resource.Version == "v1" && ctx.Resource.Resource == "services" { - - return controllertesting.Handled, nil, errors.New("test error resolving subscriber URI") - } - return controllertesting.Unhandled, nil, nil - }, - }, - }, - WantErrMsg: "test error resolving subscriber URI", + WantErrMsg: ` "" not found`, WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Create K8s Service error", - Scheme: scheme.Scheme, + Name: "Broker Filter Service not found", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), }, - Objects: []runtime.Object{ - makeSubscriberServiceAsUnstructured(), - }, - Mocks: controllertesting.Mocks{ - MockCreates: []controllertesting.MockCreate{ - func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*corev1.Service); ok { - return controllertesting.Handled, errors.New("test error creating k8s service") - } - return controllertesting.Unhandled, nil - }, - }, - }, - WantErrMsg: "test error creating k8s service", + WantErrMsg: ` "" not found`, WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Update K8s Service error", - Scheme: scheme.Scheme, + Name: "Get Broker Filter Service error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), - makeDifferentK8sService(), - }, - Objects: []runtime.Object{ - makeSubscriberServiceAsUnstructured(), }, Mocks: controllertesting.Mocks{ - MockUpdates: []controllertesting.MockUpdate{ - func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*corev1.Service); ok { - return controllertesting.Handled, errors.New("test error updating k8s service") + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, opts *client.ListOptions, list runtime.Object) (handled controllertesting.MockHandled, e error) { + if _, ok := list.(*corev1.ServiceList); ok { + return controllertesting.Handled, errors.New("test error getting Broker's filter Service") } return controllertesting.Unhandled, nil }, }, }, - WantErrMsg: "test error updating k8s service", + WantErrMsg: "test error getting Broker's filter Service", WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Create Virtual Service error", - Scheme: scheme.Scheme, + Name: "Resolve subscriberURI error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), - makeService(), - }, - Objects: []runtime.Object{ - makeSubscriberServiceAsUnstructured(), + makeBrokerFilterService(), }, - Mocks: controllertesting.Mocks{ - MockCreates: []controllertesting.MockCreate{ - func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*istiov1alpha3.VirtualService); ok { - return controllertesting.Handled, errors.New("test error creating virtual service") + DynamicMocks: controllertesting.DynamicMocks{ + MockGets: []controllertesting.MockDynamicGet{ + func(ctx *controllertesting.MockDynamicContext, name string, options metav1.GetOptions, subresources ...string) (handled controllertesting.MockHandled, i *unstructured.Unstructured, e error) { + if ctx.Resource.Group == "" && ctx.Resource.Version == "v1" && ctx.Resource.Resource == "services" { + + return controllertesting.Handled, nil, errors.New("test error resolving subscriber URI") } - return controllertesting.Unhandled, nil + return controllertesting.Unhandled, nil, nil }, }, }, - WantErrMsg: "test error creating virtual service", + WantErrMsg: "test error resolving subscriber URI", WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Update Virtual Service error", - Scheme: scheme.Scheme, + Name: "Get Subscription error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), - makeService(), - makeDifferentVirtualService(), + makeBrokerFilterService(), }, Objects: []runtime.Object{ makeSubscriberServiceAsUnstructured(), }, Mocks: controllertesting.Mocks{ - MockUpdates: []controllertesting.MockUpdate{ - func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*istiov1alpha3.VirtualService); ok { - return controllertesting.Handled, errors.New("test error updating virtual service") + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, _ *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := list.(*v1alpha1.SubscriptionList); ok { + return controllertesting.Handled, errors.New("test error listing subscription") } return controllertesting.Unhandled, nil }, }, }, - WantErrMsg: "test error updating virtual service", + WantErrMsg: "test error listing subscription", WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Create Subscription error", - Scheme: scheme.Scheme, + Name: "Create Subscription error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), - makeService(), - makeVirtualService(), + makeBrokerFilterService(), }, Objects: []runtime.Object{ makeSubscriberServiceAsUnstructured(), @@ -382,14 +354,12 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Delete Subscription error", - Scheme: scheme.Scheme, + Name: "Delete Subscription error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), - makeService(), - makeVirtualService(), + makeBrokerFilterService(), makeDifferentSubscription(), }, Objects: []runtime.Object{ @@ -409,14 +379,12 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{events[subscriptionDeleteFailed], events[triggerReconcileFailed]}, }, { - Name: "Re-create Subscription error", - Scheme: scheme.Scheme, + Name: "Re-create Subscription error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), - makeService(), - makeVirtualService(), + makeBrokerFilterService(), makeDifferentSubscription(), }, Objects: []runtime.Object{ @@ -436,14 +404,12 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{events[subscriptionCreateFailed], events[triggerReconcileFailed]}, }, { - Name: "Update status error", - Scheme: scheme.Scheme, + Name: "Update status error", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), - makeService(), - makeVirtualService(), + makeBrokerFilterService(), makeSameSubscription(), }, Objects: []runtime.Object{ @@ -463,14 +429,12 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{events[triggerReconciled], events[triggerUpdateStatusFailed]}, }, { - Name: "Trigger reconciliation success", - Scheme: scheme.Scheme, + Name: "Trigger reconciliation success", InitialState: []runtime.Object{ makeTrigger(), makeBroker(), makeTriggerChannel(), - makeService(), - makeVirtualService(), + makeBrokerFilterService(), makeSameSubscription(), }, Objects: []runtime.Object{ @@ -495,10 +459,122 @@ func TestReconcile(t *testing.T) { } tc.ReconcileKey = fmt.Sprintf("%s/%s", testNS, triggerName) tc.IgnoreTimes = true + tc.Scheme = scheme.Scheme t.Run(tc.Name, tc.Runner(t, r, c, recorder)) } } +func TestMapBrokerToTriggers(t *testing.T) { + testCases := map[string]struct { + initialState []runtime.Object + mocks controllertesting.Mocks + expected []reconcile.Request + }{ + "List error": { + mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, _ *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New("test induced error") + }, + }, + }, + expected: []reconcile.Request{}, + }, + "One Trigger": { + initialState: []runtime.Object{ + makeTrigger(), + }, + expected: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: testNS, + Name: triggerName, + }, + }, + }, + }, + "Only from this namespace": { + initialState: []runtime.Object{ + makeTriggerWithNamespaceAndName(testNS, "one"), + makeTriggerWithNamespaceAndName("some-other-namespace", "will-be-ignored"), + makeTriggerWithNamespaceAndName(testNS, "two"), + }, + expected: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: testNS, + Name: "one", + }, + }, + { + NamespacedName: types.NamespacedName{ + Namespace: testNS, + Name: "two", + }, + }, + }, + }, + "Follows pagination": { + initialState: []runtime.Object{ + makeTrigger(), + }, + mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(innerClient client.Client, ctx context.Context, opts *client.ListOptions, list runtime.Object) (handled controllertesting.MockHandled, e error) { + // The first request won't have a continue token. Add it and immediately + // return. The subsequent request will have the token, remove it and send + // the request to the inner client. + tl := list.(*v1alpha1.TriggerList) + if opts.Raw.Continue != continueToken { + tl.Continue = continueToken + return controllertesting.Handled, nil + } else { + tl.Continue = "" + return controllertesting.Handled, innerClient.List(ctx, opts, list) + } + }, + }, + }, + expected: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: testNS, + Name: triggerName, + }, + }, + }, + }, + } + + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + c := (&controllertesting.TestCase{ + Scheme: scheme.Scheme, + InitialState: tc.initialState, + Mocks: tc.mocks, + }).GetClient() + + b := &mapBrokerToTriggers{ + // client and logger are the only fields that are used by the Map function. + r: &reconciler{ + client: c, + logger: zap.NewNop(), + }, + } + o := handler.MapObject{ + Meta: &metav1.ObjectMeta{ + Namespace: testNS, + Name: brokerName, + }, + } + actual := b.Map(o) + if diff := cmp.Diff(tc.expected, actual); diff != "" { + t.Errorf("Unexpected results (-want +got): %s", diff) + } + }) + } +} + func makeTrigger() *v1alpha1.Trigger { return &v1alpha1.Trigger{ TypeMeta: metav1.TypeMeta{ @@ -533,8 +609,6 @@ func makeReadyTrigger() *v1alpha1.Trigger { t.Status.InitializeConditions() t.Status.MarkBrokerExists() t.Status.SubscriberURI = fmt.Sprintf("http://%s.%s.svc.%s/", subscriberName, testNS, utils.GetClusterDomainName()) - t.Status.MarkKubernetesServiceExists() - t.Status.MarkVirtualServiceExists() t.Status.MarkSubscribed() return t } @@ -545,6 +619,13 @@ func makeDeletingTrigger() *v1alpha1.Trigger { return b } +func makeTriggerWithNamespaceAndName(namespace, name string) *v1alpha1.Trigger { + t := makeTrigger() + t.Namespace = namespace + t.Name = name + return t +} + func makeBroker() *v1alpha1.Broker { return &v1alpha1.Broker{ TypeMeta: metav1.TypeMeta{ @@ -616,37 +697,24 @@ func makeSubscriberServiceAsUnstructured() *unstructured.Unstructured { } } -func makeService() *corev1.Service { - return resources.NewService(makeTrigger()) -} - -func makeDifferentK8sService() *corev1.Service { - svc := makeService() - svc.Spec.Ports = []corev1.ServicePort{{ - Name: "http", - Port: 9999, - }} - return svc -} - -func makeVirtualService() *istiov1alpha3.VirtualService { - return resources.NewVirtualService(makeTrigger(), makeService()) +func makeBrokerFilterService() *corev1.Service { + return brokerresources.MakeFilterService(makeBroker()) } -func makeDifferentVirtualService() *istiov1alpha3.VirtualService { - vsvc := makeVirtualService() - vsvc.Spec.Hosts = []string{ - names.ServiceHostName("other_svc_name", "other_svc_namespace"), +func makeServiceURI() *url.URL { + return &url.URL{ + Scheme: "http", + Host: "service-uri", + Path: "/path", } - return vsvc } func makeSameSubscription() *v1alpha1.Subscription { - return resources.NewSubscription(makeTrigger(), makeTriggerChannel(), makeTriggerChannel(), makeService()) + return resources.NewSubscription(makeTrigger(), makeTriggerChannel(), makeTriggerChannel(), makeServiceURI()) } func makeDifferentSubscription() *v1alpha1.Subscription { - return resources.NewSubscription(makeTrigger(), makeTriggerChannel(), makeDifferentChannel(), makeService()) + return resources.NewSubscription(makeTrigger(), makeTriggerChannel(), makeDifferentChannel(), makeServiceURI()) } func getOwnerReference() metav1.OwnerReference {