diff --git a/pkg/eventfilter/attributes/filter.go b/pkg/eventfilter/attributes/filter.go new file mode 100644 index 00000000000..3079bd2988e --- /dev/null +++ b/pkg/eventfilter/attributes/filter.go @@ -0,0 +1,76 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package attributes + +import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" + "knative.dev/pkg/logging" + + eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + "knative.dev/eventing/pkg/eventfilter" +) + +type attributesFilter map[string]string + +// NewAttributesFilter returns an event filter which performs the exact match on the attributes +func NewAttributesFilter(attrs map[string]string) eventfilter.Filter { + return attributesFilter(attrs) +} + +func (attrs attributesFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult { + // Set standard context attributes. The attributes available may not be + // exactly the same as the attributes defined in the current version of the + // CloudEvents spec. + ce := map[string]interface{}{ + "specversion": event.SpecVersion(), + "type": event.Type(), + "source": event.Source(), + "subject": event.Subject(), + "id": event.ID(), + "time": event.Time().String(), + "schemaurl": event.DataSchema(), + "datacontenttype": event.DataContentType(), + "datamediatype": event.DataMediaType(), + // TODO: use data_base64 when SDK supports it. + "datacontentencoding": event.DeprecatedDataContentEncoding(), + } + ext := event.Extensions() + for k, v := range ext { + ce[k] = v + } + + for k, v := range attrs { + var value interface{} + value, ok := ce[k] + // If the attribute does not exist in the event, return false. + if !ok { + logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) + return eventfilter.FailFilter + } + // If the attribute is not set to any and is different than the one from the event, return false. + if v != eventingv1beta1.TriggerAnyFilter && v != value { + logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) + return eventfilter.FailFilter + } + } + return eventfilter.PassFilter +} + +var _ eventfilter.Filter = attributesFilter{} diff --git a/pkg/eventfilter/attributes/filter_test.go b/pkg/eventfilter/attributes/filter_test.go new file mode 100644 index 00000000000..9db9590ec30 --- /dev/null +++ b/pkg/eventfilter/attributes/filter_test.go @@ -0,0 +1,127 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package attributes + +import ( + "context" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + + "knative.dev/eventing/pkg/eventfilter" + broker "knative.dev/eventing/pkg/mtbroker" +) + +const ( + eventType = `com.example.someevent` + eventSource = `/mycontext` + extensionName = `myextension` + extensionValue = `my-extension-value` +) + +func TestAttributesFilter_Filter(t *testing.T) { + tests := map[string]struct { + filter map[string]string + event *cloudevents.Event + want eventfilter.FilterResult + }{ + "Wrong type": { + filter: attributes("some-other-type", ""), + want: eventfilter.FailFilter, + }, + "Wrong type with attribs": { + filter: attributes("some-other-type", ""), + want: eventfilter.FailFilter, + }, + "Wrong source": { + filter: attributes("", "some-other-source"), + want: eventfilter.FailFilter, + }, + "Wrong source with attribs": { + filter: attributes("", "some-other-source"), + want: eventfilter.FailFilter, + }, + "Wrong extension": { + filter: attributes("", "some-other-source"), + want: eventfilter.FailFilter, + }, + "Any": { + filter: attributes("", ""), + want: eventfilter.PassFilter, + }, + "Specific": { + filter: attributes(eventType, eventSource), + want: eventfilter.PassFilter, + }, + "Extension with attribs": { + filter: attributesWithExtension(eventType, eventSource, extensionValue), + event: makeEventWithExtension(extensionName, extensionValue), + want: eventfilter.PassFilter, + }, + "Any with attribs - Arrival extension": { + filter: attributes("", ""), + event: makeEventWithExtension(broker.EventArrivalTime, "2019-08-26T23:38:17.834384404Z"), + want: eventfilter.PassFilter, + }, + "Wrong Extension with attribs": { + filter: attributesWithExtension(eventType, eventSource, "some-other-extension-value"), + event: makeEventWithExtension(extensionName, extensionValue), + want: eventfilter.FailFilter, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + e := tt.event + if e == nil { + e = makeEvent() + } + + if got := NewAttributesFilter(tt.filter).Filter(context.TODO(), *e); got != tt.want { + t.Errorf("Filter() = %v, want %v", got, tt.want) + } + }) + } +} + +func makeEvent() *cloudevents.Event { + e := cloudevents.NewEvent() + e.SetType(eventType) + e.SetSource(eventSource) + e.SetID("1234") + return &e +} + +func makeEventWithExtension(extName, extValue string) *cloudevents.Event { + e := makeEvent() + e.SetExtension(extName, extValue) + return e +} + +func attributes(t, s string) map[string]string { + return map[string]string{ + "type": t, + "source": s, + } +} + +func attributesWithExtension(t, s, e string) map[string]string { + return map[string]string{ + "type": t, + "source": s, + extensionName: e, + } +} diff --git a/pkg/eventfilter/filter.go b/pkg/eventfilter/filter.go new file mode 100644 index 00000000000..e1d1f062371 --- /dev/null +++ b/pkg/eventfilter/filter.go @@ -0,0 +1,68 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventfilter + +import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +const ( + PassFilter FilterResult = "pass" + FailFilter FilterResult = "fail" + NoFilter FilterResult = "no_filter" +) + +// FilterResult has the result of the filtering operation. +type FilterResult string + +func (x FilterResult) And(y FilterResult) FilterResult { + if x == NoFilter { + return y + } + if y == NoFilter { + return x + } + if x == PassFilter && y == PassFilter { + return PassFilter + } + return FailFilter +} + +// Filter is an interface representing an event filter of the trigger filter +type Filter interface { + // Filter compute the predicate on the provided event and returns the result of the matching + Filter(ctx context.Context, event cloudevents.Event) FilterResult +} + +// Filters is a wrapper that runs each filter and performs the and +type Filters []Filter + +func (filters Filters) Filter(ctx context.Context, event cloudevents.Event) FilterResult { + res := NoFilter + for _, f := range filters { + res = res.And(f.Filter(ctx, event)) + // Short circuit to optimize it + if res == FailFilter { + return FailFilter + } + } + return res +} + +var _ Filter = Filters{} diff --git a/pkg/eventfilter/filter_test.go b/pkg/eventfilter/filter_test.go new file mode 100644 index 00000000000..40c6ebf0e8d --- /dev/null +++ b/pkg/eventfilter/filter_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventfilter + +import ( + "context" + "fmt" + "strings" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/stretchr/testify/require" +) + +type mockFilter FilterResult + +func (p mockFilter) Filter(ctx context.Context, event cloudevents.Event) FilterResult { + return FilterResult(p) +} + +func TestFilters(t *testing.T) { + tests := []struct { + have []FilterResult + want FilterResult + }{{ + have: []FilterResult{}, + want: NoFilter, + }, { + have: []FilterResult{PassFilter}, + want: PassFilter, + }, { + have: []FilterResult{FailFilter}, + want: FailFilter, + }, { + have: []FilterResult{PassFilter, PassFilter}, + want: PassFilter, + }, { + have: []FilterResult{PassFilter, FailFilter}, + want: FailFilter, + }, { + have: []FilterResult{FailFilter, FailFilter}, + want: FailFilter, + }} + for _, tt := range tests { + t.Run(testName(tt.have, tt.want), func(t *testing.T) { + var filters Filters + for _, fr := range tt.have { + filters = append(filters, mockFilter(fr)) + } + require.Equal(t, tt.want, filters.Filter(context.TODO(), cloudevents.Event{})) + }) + } +} + +func testName(res []FilterResult, want FilterResult) string { + if len(res) != 0 { + var operands []string + for _, r := range res { + operands = append(operands, fmt.Sprintf("'%s'", string(r))) + } + return strings.Join(operands, " and ") + " = '" + string(want) + "'" + } + return string(NoFilter) + " = '" + string(want) + "'" +} diff --git a/pkg/mtbroker/filter/filter_handler.go b/pkg/mtbroker/filter/filter_handler.go index 3b87c9ed5d9..be8b7018d80 100644 --- a/pkg/mtbroker/filter/filter_handler.go +++ b/pkg/mtbroker/filter/filter_handler.go @@ -34,6 +34,8 @@ import ( eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1beta1" + "knative.dev/eventing/pkg/eventfilter" + "knative.dev/eventing/pkg/eventfilter/attributes" "knative.dev/eventing/pkg/kncloudevents" broker "knative.dev/eventing/pkg/mtbroker" "knative.dev/eventing/pkg/reconciler/sugar/trigger/path" @@ -42,10 +44,6 @@ import ( ) const ( - passFilter FilterResult = "pass" - failFilter FilterResult = "fail" - noFilter FilterResult = "no_filter" - // TODO make these constants configurable (either as env variables, config map, or part of broker spec). // Issue: https://github.com/knative/eventing/issues/1777 // Constants for the underlying HTTP Client transport. These would enable better connection reuse. @@ -69,9 +67,6 @@ type Handler struct { logger *zap.Logger } -// FilterResult has the result of the filtering operation. -type FilterResult string - // NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for // Start()ing the returned Handler. func NewHandler(logger *zap.Logger, triggerLister eventinglisters.TriggerLister, reporter StatsReporter, port int) (*Handler, error) { @@ -189,9 +184,9 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { // Check if the event should be sent. ctx = logging.WithLogger(ctx, h.logger.Sugar()) - filterResult := h.shouldSendEvent(ctx, &t.Spec, event) + filterResult := filterEvent(ctx, t.Spec.Filter, *event) - if filterResult == failFilter { + if filterResult == eventfilter.FailFilter { // We do not count the event. The event will be counted in the broker ingress. // If the filter didn't pass, it means that the event wasn't meant for this Trigger. return @@ -329,54 +324,15 @@ func (h *Handler) getTrigger(ref path.NamespacedNameUID) (*eventingv1beta1.Trigg return t, nil } -// shouldSendEvent determines whether event 'event' should be sent based on the triggerSpec 'ts'. -// Currently it supports exact matching on event context attributes and extension attributes. -// If no filter is present, shouldSendEvent returns passFilter. -func (h *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1beta1.TriggerSpec, event *cloudevents.Event) FilterResult { - // No filter specified, default to passing everything. - if ts.Filter == nil || len(ts.Filter.Attributes) == 0 { - return noFilter - } - return filterEventByAttributes(ctx, map[string]string(ts.Filter.Attributes), event) -} - -func filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) FilterResult { - // Set standard context attributes. The attributes available may not be - // exactly the same as the attributes defined in the current version of the - // CloudEvents spec. - ce := map[string]interface{}{ - "specversion": event.SpecVersion(), - "type": event.Type(), - "source": event.Source(), - "subject": event.Subject(), - "id": event.ID(), - "time": event.Time().String(), - "schemaurl": event.DataSchema(), - "datacontenttype": event.DataContentType(), - "datamediatype": event.DataMediaType(), - // TODO: use data_base64 when SDK supports it. - "datacontentencoding": event.DeprecatedDataContentEncoding(), +func filterEvent(ctx context.Context, filter *eventingv1beta1.TriggerFilter, event cloudevents.Event) eventfilter.FilterResult { + if filter == nil { + return eventfilter.NoFilter } - ext := event.Extensions() - for k, v := range ext { - ce[k] = v - } - - for k, v := range attrs { - var value interface{} - value, ok := ce[k] - // If the attribute does not exist in the event, return false. - if !ok { - logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) - return failFilter - } - // If the attribute is not set to any and is different than the one from the event, return false. - if v != eventingv1beta1.TriggerAnyFilter && v != value { - logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) - return failFilter - } + var filters eventfilter.Filters + if filter.Attributes != nil && len(filter.Attributes) != 0 { + filters = append(filters, attributes.NewAttributesFilter(filter.Attributes)) } - return passFilter + return filters.Filter(ctx, event) } // triggerFilterAttribute returns the filter attribute value for a given `attributeName`. If it doesn't not exist,