Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions pkg/broker/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -86,25 +85,14 @@ func (r *Receiver) sendEvent(trigger provisioners.ChannelReference, message *pro
return nil
}

// Initialize the client. Mainly intended to load stuff in its cache.
// Initialize the client. Mainly intended to create the informer/indexer in order not to drop messages.
func (r *Receiver) initClient() error {
// We list triggers so that we can load the client's cache. Otherwise, on receiving an event, it
// We list triggers so that we do not drop messages. Otherwise, on receiving an event, it
// may not find the trigger and would return an error.
opts := &client.ListOptions{
// Set Raw because if we need to get more than one page, then we will put the continue token
// into opts.Raw.Continue.
Raw: &metav1.ListOptions{},
}
for {
tl := &eventingv1alpha1.TriggerList{}
if err := r.client.List(context.TODO(), opts, tl); err != nil {
return err
}
if tl.Continue != "" {
opts.Raw.Continue = tl.Continue
} else {
break
}
opts := &client.ListOptions{}
tl := &eventingv1alpha1.TriggerList{}
if err := r.client.List(context.TODO(), opts, tl); err != nil {
return err
}
return nil
}
Expand All @@ -128,13 +116,27 @@ func (r *Receiver) shouldSendMessage(ts *eventingv1alpha1.TriggerSpec, m *provis
return false
}
filterType := ts.Filter.SourceAndType.Type
if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != m.Headers["Ce-Eventtype"] {
r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("message.type", m.Headers["Ce-Eventtype"]))
// TODO the inspection of Headers should be removed once we start using the cloud events SDK.
cloudEventType := ""
if et, ok := m.Headers["Ce-Eventtype"]; ok {
// cloud event spec v0.1.
cloudEventType = et
} else if et, ok := m.Headers["Ce-Type"]; ok {
// cloud event spec v0.2.
cloudEventType = et
}
if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != cloudEventType {
r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("message.type", cloudEventType))
return false
}
filterSource := ts.Filter.SourceAndType.Source
if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != m.Headers["Ce-Source"] {
r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", m.Headers["Ce-Source"]))
cloudEventSource := ""
// cloud event spec v0.1 and v0.2.
if es, ok := m.Headers["Ce-Source"]; ok {
cloudEventSource = es
}
if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != cloudEventSource {
r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", cloudEventSource))
return false
}
return true
Expand Down
60 changes: 36 additions & 24 deletions pkg/broker/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,31 @@ func TestReceiver(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
mr, _ := New(
zap.NewNop(),
fake.NewFakeClient(tc.initialState...))
fd := &fakeDispatcher{
err: tc.dispatchErr,
}
mr.dispatcher = fd

resp := httptest.NewRecorder()
mr.newMessageReceiver().HandleRequest(resp, makeRequest())
if tc.expectedErr {
if resp.Result().StatusCode >= 200 && resp.Result().StatusCode < 300 {
t.Errorf("Expected an error. Actual: %v", resp.Result())
// Support cloud spec v0.1 and v0.2 requests.
requests := [2]*http.Request{makeV01Request(), makeV02Request()}
for _, request := range requests {
mr, _ := New(
zap.NewNop(),
fake.NewFakeClient(tc.initialState...))
fd := &fakeDispatcher{
err: tc.dispatchErr,
}
} else {
if resp.Result().StatusCode < 200 || resp.Result().StatusCode >= 300 {
t.Errorf("Expected success. Actual: %v", resp.Result())
mr.dispatcher = fd

resp := httptest.NewRecorder()
mr.newMessageReceiver().HandleRequest(resp, request)
if tc.expectedErr {
if resp.Result().StatusCode >= 200 && resp.Result().StatusCode < 300 {
t.Errorf("Expected an error. Actual: %v", resp.Result())
}
} else {
if resp.Result().StatusCode < 200 || resp.Result().StatusCode >= 300 {
t.Errorf("Expected success. Actual: %v", resp.Result())
}
}
if tc.expectedDispatch != fd.requestReceived {
t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fd.requestReceived)
}
}
if tc.expectedDispatch != fd.requestReceived {
t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fd.requestReceived)
}
})
}
Expand Down Expand Up @@ -178,15 +182,15 @@ func makeTriggerWithoutSubscriberURI() *eventingv1alpha1.Trigger {
return t
}

func makeRequest() *http.Request {
func makeRequest(cloudEventVersionValue, eventTypeVersionValue, eventTypeKey, eventSourceKey string) *http.Request {
req := httptest.NewRequest("POST", "/", strings.NewReader(`<much wow="xml"/>`))
req.Host = fmt.Sprintf("%s.%s.triggers.%s", triggerName, testNS, utils.GetClusterDomainName())

eventAttributes := map[string]string{
"CE-CloudEventsVersion": `"0.1"`,
"CE-EventType": eventType,
"CE-EventTypeVersion": `"1.0"`,
"CE-Source": eventSource,
"CE-CloudEventsVersion": cloudEventVersionValue,
eventTypeKey: eventType,
"CE-EventTypeVersion": eventTypeVersionValue,
eventSourceKey: eventSource,
"CE-EventID": `"A234-1234-1234"`,
"CE-EventTime": `"2018-04-05T17:31:00Z"`,
"contentType": "text/xml",
Expand All @@ -196,3 +200,11 @@ func makeRequest() *http.Request {
}
return req
}

func makeV01Request() *http.Request {
return makeRequest(`"0.1"`, `"1.0"`, "CE-EventType", "CE-Source")
}

func makeV02Request() *http.Request {
return makeRequest(`"0.2"`, `"2.0"`, "ce-type", "ce-source")
}