From c19b851473ba2d39e65b0858b9c99947f122f6e7 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 10 Nov 2020 16:27:07 +0100 Subject: [PATCH 1/6] Reorg of filtering code Signed-off-by: Francesco Guardiani --- pkg/eventfilter/attributes/filter.go | 57 +++++++++++ pkg/eventfilter/attributes/filter_test.go | 111 ++++++++++++++++++++++ pkg/eventfilter/filter.go | 45 +++++++++ pkg/eventfilter/filter_test.go | 62 ++++++++++++ pkg/mtbroker/filter/filter_handler.go | 71 ++++---------- 5 files changed, 291 insertions(+), 55 deletions(-) create mode 100644 pkg/eventfilter/attributes/filter.go create mode 100644 pkg/eventfilter/attributes/filter_test.go create mode 100644 pkg/eventfilter/filter.go create mode 100644 pkg/eventfilter/filter_test.go diff --git a/pkg/eventfilter/attributes/filter.go b/pkg/eventfilter/attributes/filter.go new file mode 100644 index 00000000000..c80e528d621 --- /dev/null +++ b/pkg/eventfilter/attributes/filter.go @@ -0,0 +1,57 @@ +package attributes + +import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" + "knative.dev/pkg/logging" + + eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + "knative.dev/eventing/pkg/eventfilter" +) + +type AttributesFilter map[string]string + +func NewAttributesFilter(attrs map[string]string) eventfilter.Filter { + return AttributesFilter(attrs) +} + +func (attrs AttributesFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult { + // Set standard context attributes. The attributes available may not be + // exactly the same as the attributes defined in the current version of the + // CloudEvents spec. + ce := map[string]interface{}{ + "specversion": event.SpecVersion(), + "type": event.Type(), + "source": event.Source(), + "subject": event.Subject(), + "id": event.ID(), + "time": event.Time().String(), + "schemaurl": event.DataSchema(), + "datacontenttype": event.DataContentType(), + "datamediatype": event.DataMediaType(), + // TODO: use data_base64 when SDK supports it. + "datacontentencoding": event.DeprecatedDataContentEncoding(), + } + ext := event.Extensions() + for k, v := range ext { + ce[k] = v + } + + for k, v := range attrs { + var value interface{} + value, ok := ce[k] + // If the attribute does not exist in the event, return false. + if !ok { + logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) + return eventfilter.FailFilter + } + // If the attribute is not set to any and is different than the one from the event, return false. + if v != eventingv1beta1.TriggerAnyFilter && v != value { + logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) + return eventfilter.FailFilter + } + } + return eventfilter.PassFilter +} diff --git a/pkg/eventfilter/attributes/filter_test.go b/pkg/eventfilter/attributes/filter_test.go new file mode 100644 index 00000000000..cbbcaa19170 --- /dev/null +++ b/pkg/eventfilter/attributes/filter_test.go @@ -0,0 +1,111 @@ +package attributes + +import ( + "context" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + + "knative.dev/eventing/pkg/eventfilter" + broker "knative.dev/eventing/pkg/mtbroker" +) + +const ( + eventType = `com.example.someevent` + eventSource = `/mycontext` + extensionName = `myextension` + extensionValue = `my-extension-value` +) + +func TestAttributesFilter_Filter(t *testing.T) { + tests := map[string]struct { + filter map[string]string + event *cloudevents.Event + want eventfilter.FilterResult + }{ + "Wrong type": { + filter: attributesFilter("some-other-type", ""), + want: eventfilter.FailFilter, + }, + "Wrong type with attribs": { + filter: attributesFilter("some-other-type", ""), + want: eventfilter.FailFilter, + }, + "Wrong source": { + filter: attributesFilter("", "some-other-source"), + want: eventfilter.FailFilter, + }, + "Wrong source with attribs": { + filter: attributesFilter("", "some-other-source"), + want: eventfilter.FailFilter, + }, + "Wrong extension": { + filter: attributesFilter("", "some-other-source"), + want: eventfilter.FailFilter, + }, + "Any": { + filter: attributesFilter("", ""), + want: eventfilter.PassFilter, + }, + "Specific": { + filter: attributesFilter(eventType, eventSource), + want: eventfilter.PassFilter, + }, + "Extension with attribs": { + filter: attributesWithExtensionFilter(eventType, eventSource, extensionValue), + event: makeEventWithExtension(extensionName, extensionValue), + want: eventfilter.PassFilter, + }, + "Any with attribs - Arrival extension": { + filter: attributesFilter("", ""), + event: makeEventWithExtension(broker.EventArrivalTime, "2019-08-26T23:38:17.834384404Z"), + want: eventfilter.PassFilter, + }, + "Wrong Extension with attribs": { + filter: attributesWithExtensionFilter(eventType, eventSource, "some-other-extension-value"), + event: makeEventWithExtension(extensionName, extensionValue), + want: eventfilter.FailFilter, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + e := tt.event + if e == nil { + e = makeEvent() + } + + if got := NewAttributesFilter(tt.filter).Filter(context.TODO(), *e); got != tt.want { + t.Errorf("Filter() = %v, want %v", got, tt.want) + } + }) + } +} + +func makeEvent() *cloudevents.Event { + e := cloudevents.NewEvent() + e.SetType(eventType) + e.SetSource(eventSource) + e.SetID("1234") + return &e +} + +func makeEventWithExtension(extName, extValue string) *cloudevents.Event { + e := makeEvent() + e.SetExtension(extName, extValue) + return e +} + +func attributesFilter(t, s string) map[string]string { + return map[string]string{ + "type": t, + "source": s, + } +} + +func attributesWithExtensionFilter(t, s, e string) map[string]string { + return map[string]string{ + "type": t, + "source": s, + extensionName: e, + } +} diff --git a/pkg/eventfilter/filter.go b/pkg/eventfilter/filter.go new file mode 100644 index 00000000000..af9c868475d --- /dev/null +++ b/pkg/eventfilter/filter.go @@ -0,0 +1,45 @@ +package eventfilter + +import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +const ( + PassFilter FilterResult = "pass" + FailFilter FilterResult = "fail" + NoFilter FilterResult = "no_filter" +) + +// FilterResult has the result of the filtering operation. +type FilterResult string + +func (x FilterResult) And(y FilterResult) FilterResult { + if x == NoFilter { + return y + } + if y == NoFilter { + return x + } + if x == PassFilter && y == PassFilter { + return PassFilter + } + return FailFilter +} + +// Filter is an interface representing an event filter of the trigger filter +type Filter interface { + Filter(ctx context.Context, event cloudevents.Event) FilterResult +} + +// Filters is a wrapper that runs each filter and performs the and +type Filters []Filter + +func (filters Filters) Filter(ctx context.Context, event cloudevents.Event) FilterResult { + res := NoFilter + for _, f := range filters { + res = res.And(f.Filter(ctx, event)) + } + return res +} diff --git a/pkg/eventfilter/filter_test.go b/pkg/eventfilter/filter_test.go new file mode 100644 index 00000000000..cbecd8a3ab7 --- /dev/null +++ b/pkg/eventfilter/filter_test.go @@ -0,0 +1,62 @@ +package eventfilter + +import ( + "context" + "fmt" + "strings" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/stretchr/testify/require" +) + +type mockFilter FilterResult + +func (p mockFilter) Filter(ctx context.Context, event cloudevents.Event) FilterResult { + return FilterResult(p) +} + +func TestFilters(t *testing.T) { + tests := []struct { + have []FilterResult + want FilterResult + }{{ + have: []FilterResult{}, + want: NoFilter, + }, { + have: []FilterResult{PassFilter}, + want: PassFilter, + }, { + have: []FilterResult{FailFilter}, + want: FailFilter, + }, { + have: []FilterResult{PassFilter, PassFilter}, + want: PassFilter, + }, { + have: []FilterResult{PassFilter, FailFilter}, + want: FailFilter, + }, { + have: []FilterResult{FailFilter, FailFilter}, + want: FailFilter, + }} + for _, tt := range tests { + t.Run(testName(tt.have, tt.want), func(t *testing.T) { + var filters Filters + for _, fr := range tt.have { + filters = append(filters, mockFilter(fr)) + } + require.Equal(t, tt.want, filters.Filter(context.TODO(), cloudevents.Event{})) + }) + } +} + +func testName(res []FilterResult, want FilterResult) string { + if len(res) != 0 { + var operands []string + for _, r := range res { + operands = append(operands, fmt.Sprintf("'%s'", string(r))) + } + return strings.Join(operands, " and ") + " = '" + string(want) + "'" + } + return string(NoFilter) + " = '" + string(want) + "'" +} diff --git a/pkg/mtbroker/filter/filter_handler.go b/pkg/mtbroker/filter/filter_handler.go index 3b87c9ed5d9..474831db94c 100644 --- a/pkg/mtbroker/filter/filter_handler.go +++ b/pkg/mtbroker/filter/filter_handler.go @@ -34,6 +34,8 @@ import ( eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1beta1" + "knative.dev/eventing/pkg/eventfilter" + "knative.dev/eventing/pkg/eventfilter/attributes" "knative.dev/eventing/pkg/kncloudevents" broker "knative.dev/eventing/pkg/mtbroker" "knative.dev/eventing/pkg/reconciler/sugar/trigger/path" @@ -42,10 +44,6 @@ import ( ) const ( - passFilter FilterResult = "pass" - failFilter FilterResult = "fail" - noFilter FilterResult = "no_filter" - // TODO make these constants configurable (either as env variables, config map, or part of broker spec). // Issue: https://github.com/knative/eventing/issues/1777 // Constants for the underlying HTTP Client transport. These would enable better connection reuse. @@ -69,9 +67,6 @@ type Handler struct { logger *zap.Logger } -// FilterResult has the result of the filtering operation. -type FilterResult string - // NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for // Start()ing the returned Handler. func NewHandler(logger *zap.Logger, triggerLister eventinglisters.TriggerLister, reporter StatsReporter, port int) (*Handler, error) { @@ -189,9 +184,14 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { // Check if the event should be sent. ctx = logging.WithLogger(ctx, h.logger.Sugar()) - filterResult := h.shouldSendEvent(ctx, &t.Spec, event) + filterResult, err := filterEvent(ctx, t.Spec.Filter, *event) + if err != nil { + h.logger.Error("Error while filtering", zap.Error(err)) + writer.WriteHeader(http.StatusInternalServerError) + return + } - if filterResult == failFilter { + if filterResult == eventfilter.FailFilter { // We do not count the event. The event will be counted in the broker ingress. // If the filter didn't pass, it means that the event wasn't meant for this Trigger. return @@ -329,54 +329,15 @@ func (h *Handler) getTrigger(ref path.NamespacedNameUID) (*eventingv1beta1.Trigg return t, nil } -// shouldSendEvent determines whether event 'event' should be sent based on the triggerSpec 'ts'. -// Currently it supports exact matching on event context attributes and extension attributes. -// If no filter is present, shouldSendEvent returns passFilter. -func (h *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1beta1.TriggerSpec, event *cloudevents.Event) FilterResult { - // No filter specified, default to passing everything. - if ts.Filter == nil || len(ts.Filter.Attributes) == 0 { - return noFilter +func filterEvent(ctx context.Context, filter *eventingv1beta1.TriggerFilter, event cloudevents.Event) (eventfilter.FilterResult, error) { + if filter == nil { + return eventfilter.NoFilter, nil } - return filterEventByAttributes(ctx, map[string]string(ts.Filter.Attributes), event) -} - -func filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) FilterResult { - // Set standard context attributes. The attributes available may not be - // exactly the same as the attributes defined in the current version of the - // CloudEvents spec. - ce := map[string]interface{}{ - "specversion": event.SpecVersion(), - "type": event.Type(), - "source": event.Source(), - "subject": event.Subject(), - "id": event.ID(), - "time": event.Time().String(), - "schemaurl": event.DataSchema(), - "datacontenttype": event.DataContentType(), - "datamediatype": event.DataMediaType(), - // TODO: use data_base64 when SDK supports it. - "datacontentencoding": event.DeprecatedDataContentEncoding(), - } - ext := event.Extensions() - for k, v := range ext { - ce[k] = v - } - - for k, v := range attrs { - var value interface{} - value, ok := ce[k] - // If the attribute does not exist in the event, return false. - if !ok { - logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) - return failFilter - } - // If the attribute is not set to any and is different than the one from the event, return false. - if v != eventingv1beta1.TriggerAnyFilter && v != value { - logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) - return failFilter - } + var filters eventfilter.Filters + if filter.Attributes != nil && len(filter.Attributes) != 0 { + filters = append(filters, attributes.NewAttributesFilter(filter.Attributes)) } - return passFilter + return filters.Filter(ctx, event), nil } // triggerFilterAttribute returns the filter attribute value for a given `attributeName`. If it doesn't not exist, From de71816672efb2cbf0565ca3240120203f2d327a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 10 Nov 2020 16:35:33 +0100 Subject: [PATCH 2/6] Copyright Signed-off-by: Francesco Guardiani --- pkg/eventfilter/attributes/filter.go | 16 ++++++++++++++++ pkg/eventfilter/attributes/filter_test.go | 16 ++++++++++++++++ pkg/eventfilter/filter.go | 16 ++++++++++++++++ pkg/eventfilter/filter_test.go | 16 ++++++++++++++++ 4 files changed, 64 insertions(+) diff --git a/pkg/eventfilter/attributes/filter.go b/pkg/eventfilter/attributes/filter.go index c80e528d621..db8cb08e8c4 100644 --- a/pkg/eventfilter/attributes/filter.go +++ b/pkg/eventfilter/attributes/filter.go @@ -1,3 +1,19 @@ +/* + Copyright 2020 The Knative Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + package attributes import ( diff --git a/pkg/eventfilter/attributes/filter_test.go b/pkg/eventfilter/attributes/filter_test.go index cbbcaa19170..ed559069694 100644 --- a/pkg/eventfilter/attributes/filter_test.go +++ b/pkg/eventfilter/attributes/filter_test.go @@ -1,3 +1,19 @@ +/* + Copyright 2020 The Knative Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + package attributes import ( diff --git a/pkg/eventfilter/filter.go b/pkg/eventfilter/filter.go index af9c868475d..ec5bda2d09a 100644 --- a/pkg/eventfilter/filter.go +++ b/pkg/eventfilter/filter.go @@ -1,3 +1,19 @@ +/* + Copyright 2020 The Knative Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + package eventfilter import ( diff --git a/pkg/eventfilter/filter_test.go b/pkg/eventfilter/filter_test.go index cbecd8a3ab7..80d9cafcb43 100644 --- a/pkg/eventfilter/filter_test.go +++ b/pkg/eventfilter/filter_test.go @@ -1,3 +1,19 @@ +/* + Copyright 2020 The Knative Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + package eventfilter import ( From ad3c75ec1e88ad594a211d70c8851dd074baeb6f Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 10 Nov 2020 16:44:23 +0100 Subject: [PATCH 3/6] Copyright Signed-off-by: Francesco Guardiani --- pkg/eventfilter/attributes/filter.go | 20 ++++++++++---------- pkg/eventfilter/attributes/filter_test.go | 20 ++++++++++---------- pkg/eventfilter/filter.go | 20 ++++++++++---------- pkg/eventfilter/filter_test.go | 20 ++++++++++---------- 4 files changed, 40 insertions(+), 40 deletions(-) diff --git a/pkg/eventfilter/attributes/filter.go b/pkg/eventfilter/attributes/filter.go index db8cb08e8c4..ae18586504d 100644 --- a/pkg/eventfilter/attributes/filter.go +++ b/pkg/eventfilter/attributes/filter.go @@ -1,17 +1,17 @@ /* - Copyright 2020 The Knative Authors +Copyright 2020 The Knative Authors - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package attributes diff --git a/pkg/eventfilter/attributes/filter_test.go b/pkg/eventfilter/attributes/filter_test.go index ed559069694..351518a78d3 100644 --- a/pkg/eventfilter/attributes/filter_test.go +++ b/pkg/eventfilter/attributes/filter_test.go @@ -1,17 +1,17 @@ /* - Copyright 2020 The Knative Authors +Copyright 2020 The Knative Authors - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package attributes diff --git a/pkg/eventfilter/filter.go b/pkg/eventfilter/filter.go index ec5bda2d09a..4e979052f52 100644 --- a/pkg/eventfilter/filter.go +++ b/pkg/eventfilter/filter.go @@ -1,17 +1,17 @@ /* - Copyright 2020 The Knative Authors +Copyright 2020 The Knative Authors - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package eventfilter diff --git a/pkg/eventfilter/filter_test.go b/pkg/eventfilter/filter_test.go index 80d9cafcb43..40c6ebf0e8d 100644 --- a/pkg/eventfilter/filter_test.go +++ b/pkg/eventfilter/filter_test.go @@ -1,17 +1,17 @@ /* - Copyright 2020 The Knative Authors +Copyright 2020 The Knative Authors - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package eventfilter From c93c8f2c4192fcea55434a3fe50fb52f4ba7c86e Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 10 Nov 2020 16:54:17 +0100 Subject: [PATCH 4/6] Remove err Signed-off-by: Francesco Guardiani --- pkg/mtbroker/filter/filter_handler.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/pkg/mtbroker/filter/filter_handler.go b/pkg/mtbroker/filter/filter_handler.go index 474831db94c..be8b7018d80 100644 --- a/pkg/mtbroker/filter/filter_handler.go +++ b/pkg/mtbroker/filter/filter_handler.go @@ -184,12 +184,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { // Check if the event should be sent. ctx = logging.WithLogger(ctx, h.logger.Sugar()) - filterResult, err := filterEvent(ctx, t.Spec.Filter, *event) - if err != nil { - h.logger.Error("Error while filtering", zap.Error(err)) - writer.WriteHeader(http.StatusInternalServerError) - return - } + filterResult := filterEvent(ctx, t.Spec.Filter, *event) if filterResult == eventfilter.FailFilter { // We do not count the event. The event will be counted in the broker ingress. @@ -329,15 +324,15 @@ func (h *Handler) getTrigger(ref path.NamespacedNameUID) (*eventingv1beta1.Trigg return t, nil } -func filterEvent(ctx context.Context, filter *eventingv1beta1.TriggerFilter, event cloudevents.Event) (eventfilter.FilterResult, error) { +func filterEvent(ctx context.Context, filter *eventingv1beta1.TriggerFilter, event cloudevents.Event) eventfilter.FilterResult { if filter == nil { - return eventfilter.NoFilter, nil + return eventfilter.NoFilter } var filters eventfilter.Filters if filter.Attributes != nil && len(filter.Attributes) != 0 { filters = append(filters, attributes.NewAttributesFilter(filter.Attributes)) } - return filters.Filter(ctx, event), nil + return filters.Filter(ctx, event) } // triggerFilterAttribute returns the filter attribute value for a given `attributeName`. If it doesn't not exist, From bd7ca25056131def56d0b3e3cc1f7dc75297509b Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 11 Nov 2020 18:46:58 +0100 Subject: [PATCH 5/6] Nit Signed-off-by: Francesco Guardiani --- pkg/eventfilter/attributes/filter.go | 9 ++++++--- pkg/eventfilter/attributes/filter_test.go | 24 +++++++++++------------ pkg/eventfilter/filter.go | 5 +++++ 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/pkg/eventfilter/attributes/filter.go b/pkg/eventfilter/attributes/filter.go index ae18586504d..3079bd2988e 100644 --- a/pkg/eventfilter/attributes/filter.go +++ b/pkg/eventfilter/attributes/filter.go @@ -27,13 +27,14 @@ import ( "knative.dev/eventing/pkg/eventfilter" ) -type AttributesFilter map[string]string +type attributesFilter map[string]string +// NewAttributesFilter returns an event filter which performs the exact match on the attributes func NewAttributesFilter(attrs map[string]string) eventfilter.Filter { - return AttributesFilter(attrs) + return attributesFilter(attrs) } -func (attrs AttributesFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult { +func (attrs attributesFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult { // Set standard context attributes. The attributes available may not be // exactly the same as the attributes defined in the current version of the // CloudEvents spec. @@ -71,3 +72,5 @@ func (attrs AttributesFilter) Filter(ctx context.Context, event cloudevents.Even } return eventfilter.PassFilter } + +var _ eventfilter.Filter = attributesFilter{} diff --git a/pkg/eventfilter/attributes/filter_test.go b/pkg/eventfilter/attributes/filter_test.go index 351518a78d3..9db9590ec30 100644 --- a/pkg/eventfilter/attributes/filter_test.go +++ b/pkg/eventfilter/attributes/filter_test.go @@ -40,45 +40,45 @@ func TestAttributesFilter_Filter(t *testing.T) { want eventfilter.FilterResult }{ "Wrong type": { - filter: attributesFilter("some-other-type", ""), + filter: attributes("some-other-type", ""), want: eventfilter.FailFilter, }, "Wrong type with attribs": { - filter: attributesFilter("some-other-type", ""), + filter: attributes("some-other-type", ""), want: eventfilter.FailFilter, }, "Wrong source": { - filter: attributesFilter("", "some-other-source"), + filter: attributes("", "some-other-source"), want: eventfilter.FailFilter, }, "Wrong source with attribs": { - filter: attributesFilter("", "some-other-source"), + filter: attributes("", "some-other-source"), want: eventfilter.FailFilter, }, "Wrong extension": { - filter: attributesFilter("", "some-other-source"), + filter: attributes("", "some-other-source"), want: eventfilter.FailFilter, }, "Any": { - filter: attributesFilter("", ""), + filter: attributes("", ""), want: eventfilter.PassFilter, }, "Specific": { - filter: attributesFilter(eventType, eventSource), + filter: attributes(eventType, eventSource), want: eventfilter.PassFilter, }, "Extension with attribs": { - filter: attributesWithExtensionFilter(eventType, eventSource, extensionValue), + filter: attributesWithExtension(eventType, eventSource, extensionValue), event: makeEventWithExtension(extensionName, extensionValue), want: eventfilter.PassFilter, }, "Any with attribs - Arrival extension": { - filter: attributesFilter("", ""), + filter: attributes("", ""), event: makeEventWithExtension(broker.EventArrivalTime, "2019-08-26T23:38:17.834384404Z"), want: eventfilter.PassFilter, }, "Wrong Extension with attribs": { - filter: attributesWithExtensionFilter(eventType, eventSource, "some-other-extension-value"), + filter: attributesWithExtension(eventType, eventSource, "some-other-extension-value"), event: makeEventWithExtension(extensionName, extensionValue), want: eventfilter.FailFilter, }, @@ -111,14 +111,14 @@ func makeEventWithExtension(extName, extValue string) *cloudevents.Event { return e } -func attributesFilter(t, s string) map[string]string { +func attributes(t, s string) map[string]string { return map[string]string{ "type": t, "source": s, } } -func attributesWithExtensionFilter(t, s, e string) map[string]string { +func attributesWithExtension(t, s, e string) map[string]string { return map[string]string{ "type": t, "source": s, diff --git a/pkg/eventfilter/filter.go b/pkg/eventfilter/filter.go index 4e979052f52..6fc5a509d1f 100644 --- a/pkg/eventfilter/filter.go +++ b/pkg/eventfilter/filter.go @@ -46,6 +46,7 @@ func (x FilterResult) And(y FilterResult) FilterResult { // Filter is an interface representing an event filter of the trigger filter type Filter interface { + // Filter compute the predicate on the provided event and returns the result of the matching Filter(ctx context.Context, event cloudevents.Event) FilterResult } @@ -56,6 +57,10 @@ func (filters Filters) Filter(ctx context.Context, event cloudevents.Event) Filt res := NoFilter for _, f := range filters { res = res.And(f.Filter(ctx, event)) + // Short circuit to optimize it + if res == FailFilter { + return FailFilter + } } return res } From e17917335851575b43ff6550964e00f0e0e2f313 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 11 Nov 2020 18:47:25 +0100 Subject: [PATCH 6/6] Type assertion Signed-off-by: Francesco Guardiani --- pkg/eventfilter/filter.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/eventfilter/filter.go b/pkg/eventfilter/filter.go index 6fc5a509d1f..e1d1f062371 100644 --- a/pkg/eventfilter/filter.go +++ b/pkg/eventfilter/filter.go @@ -64,3 +64,5 @@ func (filters Filters) Filter(ctx context.Context, event cloudevents.Event) Filt } return res } + +var _ Filter = Filters{}