From b9a7d0f533b401b81d01c6768400f7a24716175b Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Thu, 14 Mar 2019 11:30:08 -0700 Subject: [PATCH 1/2] Accept v0.1 and v0.2 cloud events. Adding UTs. Updating initClient as well, removing unnecessary paging. --- pkg/broker/receiver.go | 46 ++++++++++++++-------------- pkg/broker/receiver_test.go | 60 ++++++++++++++++++++++--------------- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index b591a70ced2..c1d4b22578b 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -23,7 +23,6 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -86,25 +85,14 @@ func (r *Receiver) sendEvent(trigger provisioners.ChannelReference, message *pro return nil } -// Initialize the client. Mainly intended to load stuff in its cache. +// Initialize the client. Mainly intended to create the informer/indexer in order not to drop messages. func (r *Receiver) initClient() error { - // We list triggers so that we can load the client's cache. Otherwise, on receiving an event, it + // We list triggers so that we do not drop messages. Otherwise, on receiving an event, it // may not find the trigger and would return an error. - opts := &client.ListOptions{ - // Set Raw because if we need to get more than one page, then we will put the continue token - // into opts.Raw.Continue. - Raw: &metav1.ListOptions{}, - } - for { - tl := &eventingv1alpha1.TriggerList{} - if err := r.client.List(context.TODO(), opts, tl); err != nil { - return err - } - if tl.Continue != "" { - opts.Raw.Continue = tl.Continue - } else { - break - } + opts := &client.ListOptions{} + tl := &eventingv1alpha1.TriggerList{} + if err := r.client.List(context.TODO(), opts, tl); err != nil { + return err } return nil } @@ -128,13 +116,27 @@ func (r *Receiver) shouldSendMessage(ts *eventingv1alpha1.TriggerSpec, m *provis return false } filterType := ts.Filter.SourceAndType.Type - if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != m.Headers["Ce-Eventtype"] { - r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("message.type", m.Headers["Ce-Eventtype"])) + // TODO the inspection of Headers should be removed once we start using the cloud events SDK. + cloudEventType := "" + if et, ok := m.Headers["Ce-Eventtype"]; ok { + // cloud event spec v0.1. + cloudEventType = et + } else if et, ok := m.Headers["Ce-Type"]; ok { + // cloud event spec v0.2. + cloudEventType = et + } + if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != cloudEventType { + r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("message.type", cloudEventType)) return false } filterSource := ts.Filter.SourceAndType.Source - if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != m.Headers["Ce-Source"] { - r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", m.Headers["Ce-Source"])) + cloudEventSource := "" + // cloud event spec v0.1 and v0.2. + if es, ok := m.Headers["Ce-Source"]; ok { + cloudEventSource = es + } + if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != cloudEventSource { + r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", cloudEventSource)) return false } return true diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index 02a8518a351..81c4f9e9a8b 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -106,27 +106,31 @@ func TestReceiver(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - mr, _ := New( - zap.NewNop(), - fake.NewFakeClient(tc.initialState...)) - fd := &fakeDispatcher{ - err: tc.dispatchErr, - } - mr.dispatcher = fd - - resp := httptest.NewRecorder() - mr.newMessageReceiver().HandleRequest(resp, makeRequest()) - if tc.expectedErr { - if resp.Result().StatusCode >= 200 && resp.Result().StatusCode < 300 { - t.Errorf("Expected an error. Actual: %v", resp.Result()) + // Support cloud spec v0.1 and v0.2 requests. + requests := [2]*http.Request{makeV01Request(), makeV02Request()} + for _, request := range requests { + mr, _ := New( + zap.NewNop(), + fake.NewFakeClient(tc.initialState...)) + fd := &fakeDispatcher{ + err: tc.dispatchErr, } - } else { - if resp.Result().StatusCode < 200 || resp.Result().StatusCode >= 300 { - t.Errorf("Expected success. Actual: %v", resp.Result()) + mr.dispatcher = fd + + resp := httptest.NewRecorder() + mr.newMessageReceiver().HandleRequest(resp, request) + if tc.expectedErr { + if resp.Result().StatusCode >= 200 && resp.Result().StatusCode < 300 { + t.Errorf("Expected an error. Actual: %v", resp.Result()) + } + } else { + if resp.Result().StatusCode < 200 || resp.Result().StatusCode >= 300 { + t.Errorf("Expected success. Actual: %v", resp.Result()) + } + } + if tc.expectedDispatch != fd.requestReceived { + t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fd.requestReceived) } - } - if tc.expectedDispatch != fd.requestReceived { - t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fd.requestReceived) } }) } @@ -178,15 +182,15 @@ func makeTriggerWithoutSubscriberURI() *eventingv1alpha1.Trigger { return t } -func makeRequest() *http.Request { +func makeRequest(cloudEventVersionValue, eventTypeVersionValue, eventTypeKey, eventSourceKey string) *http.Request { req := httptest.NewRequest("POST", "/", strings.NewReader(``)) req.Host = fmt.Sprintf("%s.%s.triggers.%s", triggerName, testNS, utils.GetClusterDomainName()) eventAttributes := map[string]string{ - "CE-CloudEventsVersion": `"0.1"`, - "CE-EventType": eventType, - "CE-EventTypeVersion": `"1.0"`, - "CE-Source": eventSource, + "CE-CloudEventsVersion": cloudEventVersionValue, + eventTypeKey: eventType, + "CE-EventTypeVersion": eventTypeVersionValue, + eventSourceKey: eventSource, "CE-EventID": `"A234-1234-1234"`, "CE-EventTime": `"2018-04-05T17:31:00Z"`, "contentType": "text/xml", @@ -196,3 +200,11 @@ func makeRequest() *http.Request { } return req } + +func makeV01Request() *http.Request { + return makeRequest(`"0.1"`, `"1.0"`, "CE-EventType", "CE-Source") +} + +func makeV02Request() *http.Request { + return makeRequest(`"0.2"`, `"2.0"`, "CE-Type", "CE-Source") +} From c27bf220fc3244956c82ed07e1ff25c150e3ebad Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Thu, 14 Mar 2019 15:24:35 -0700 Subject: [PATCH 2/2] lowercase --- pkg/broker/receiver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index 81c4f9e9a8b..d98edb6be45 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -206,5 +206,5 @@ func makeV01Request() *http.Request { } func makeV02Request() *http.Request { - return makeRequest(`"0.2"`, `"2.0"`, "CE-Type", "CE-Source") + return makeRequest(`"0.2"`, `"2.0"`, "ce-type", "ce-source") }