-
Notifications
You must be signed in to change notification settings - Fork 630
Reorg of trigger filtering code #4493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| /* | ||
| Copyright 2020 The Knative Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package attributes | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| cloudevents "github.com/cloudevents/sdk-go/v2" | ||
| "go.uber.org/zap" | ||
| "knative.dev/pkg/logging" | ||
|
|
||
| eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" | ||
| "knative.dev/eventing/pkg/eventfilter" | ||
| ) | ||
|
|
||
| type attributesFilter map[string]string | ||
|
|
||
| // NewAttributesFilter returns an event filter which performs the exact match on the attributes | ||
| func NewAttributesFilter(attrs map[string]string) eventfilter.Filter { | ||
| return attributesFilter(attrs) | ||
| } | ||
|
|
||
| func (attrs attributesFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO comment is required for all exported functions. And in this case this function should have comment unless we don't have to export it, which is actually the preferred way.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So there are 2 reasons why IMO it doesn't make sense to comment here:
|
||
| // Set standard context attributes. The attributes available may not be | ||
| // exactly the same as the attributes defined in the current version of the | ||
| // CloudEvents spec. | ||
| ce := map[string]interface{}{ | ||
| "specversion": event.SpecVersion(), | ||
| "type": event.Type(), | ||
| "source": event.Source(), | ||
| "subject": event.Subject(), | ||
| "id": event.ID(), | ||
| "time": event.Time().String(), | ||
| "schemaurl": event.DataSchema(), | ||
| "datacontenttype": event.DataContentType(), | ||
| "datamediatype": event.DataMediaType(), | ||
| // TODO: use data_base64 when SDK supports it. | ||
| "datacontentencoding": event.DeprecatedDataContentEncoding(), | ||
| } | ||
| ext := event.Extensions() | ||
| for k, v := range ext { | ||
| ce[k] = v | ||
| } | ||
|
|
||
| for k, v := range attrs { | ||
| var value interface{} | ||
| value, ok := ce[k] | ||
| // If the attribute does not exist in the event, return false. | ||
| if !ok { | ||
| logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) | ||
| return eventfilter.FailFilter | ||
| } | ||
| // If the attribute is not set to any and is different than the one from the event, return false. | ||
| if v != eventingv1beta1.TriggerAnyFilter && v != value { | ||
| logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) | ||
| return eventfilter.FailFilter | ||
| } | ||
| } | ||
| return eventfilter.PassFilter | ||
| } | ||
|
|
||
| var _ eventfilter.Filter = attributesFilter{} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| /* | ||
| Copyright 2020 The Knative Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package attributes | ||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
|
|
||
| cloudevents "github.com/cloudevents/sdk-go/v2" | ||
|
|
||
| "knative.dev/eventing/pkg/eventfilter" | ||
| broker "knative.dev/eventing/pkg/mtbroker" | ||
| ) | ||
|
|
||
| const ( | ||
| eventType = `com.example.someevent` | ||
| eventSource = `/mycontext` | ||
| extensionName = `myextension` | ||
| extensionValue = `my-extension-value` | ||
| ) | ||
|
|
||
| func TestAttributesFilter_Filter(t *testing.T) { | ||
| tests := map[string]struct { | ||
| filter map[string]string | ||
| event *cloudevents.Event | ||
| want eventfilter.FilterResult | ||
| }{ | ||
| "Wrong type": { | ||
| filter: attributes("some-other-type", ""), | ||
| want: eventfilter.FailFilter, | ||
| }, | ||
| "Wrong type with attribs": { | ||
| filter: attributes("some-other-type", ""), | ||
| want: eventfilter.FailFilter, | ||
| }, | ||
| "Wrong source": { | ||
| filter: attributes("", "some-other-source"), | ||
| want: eventfilter.FailFilter, | ||
| }, | ||
| "Wrong source with attribs": { | ||
| filter: attributes("", "some-other-source"), | ||
| want: eventfilter.FailFilter, | ||
| }, | ||
| "Wrong extension": { | ||
| filter: attributes("", "some-other-source"), | ||
| want: eventfilter.FailFilter, | ||
| }, | ||
| "Any": { | ||
| filter: attributes("", ""), | ||
| want: eventfilter.PassFilter, | ||
| }, | ||
| "Specific": { | ||
| filter: attributes(eventType, eventSource), | ||
| want: eventfilter.PassFilter, | ||
| }, | ||
| "Extension with attribs": { | ||
| filter: attributesWithExtension(eventType, eventSource, extensionValue), | ||
| event: makeEventWithExtension(extensionName, extensionValue), | ||
| want: eventfilter.PassFilter, | ||
| }, | ||
| "Any with attribs - Arrival extension": { | ||
| filter: attributes("", ""), | ||
| event: makeEventWithExtension(broker.EventArrivalTime, "2019-08-26T23:38:17.834384404Z"), | ||
| want: eventfilter.PassFilter, | ||
| }, | ||
| "Wrong Extension with attribs": { | ||
| filter: attributesWithExtension(eventType, eventSource, "some-other-extension-value"), | ||
| event: makeEventWithExtension(extensionName, extensionValue), | ||
| want: eventfilter.FailFilter, | ||
| }, | ||
| } | ||
| for name, tt := range tests { | ||
| t.Run(name, func(t *testing.T) { | ||
| e := tt.event | ||
| if e == nil { | ||
| e = makeEvent() | ||
| } | ||
|
|
||
| if got := NewAttributesFilter(tt.filter).Filter(context.TODO(), *e); got != tt.want { | ||
| t.Errorf("Filter() = %v, want %v", got, tt.want) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func makeEvent() *cloudevents.Event { | ||
| e := cloudevents.NewEvent() | ||
| e.SetType(eventType) | ||
| e.SetSource(eventSource) | ||
| e.SetID("1234") | ||
| return &e | ||
| } | ||
|
|
||
| func makeEventWithExtension(extName, extValue string) *cloudevents.Event { | ||
| e := makeEvent() | ||
| e.SetExtension(extName, extValue) | ||
| return e | ||
| } | ||
|
|
||
| func attributes(t, s string) map[string]string { | ||
| return map[string]string{ | ||
| "type": t, | ||
| "source": s, | ||
| } | ||
| } | ||
|
|
||
| func attributesWithExtension(t, s, e string) map[string]string { | ||
| return map[string]string{ | ||
| "type": t, | ||
| "source": s, | ||
| extensionName: e, | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| /* | ||
| Copyright 2020 The Knative Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package eventfilter | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| cloudevents "github.com/cloudevents/sdk-go/v2" | ||
| ) | ||
|
|
||
| const ( | ||
| PassFilter FilterResult = "pass" | ||
| FailFilter FilterResult = "fail" | ||
| NoFilter FilterResult = "no_filter" | ||
| ) | ||
|
|
||
| // FilterResult has the result of the filtering operation. | ||
| type FilterResult string | ||
|
|
||
| func (x FilterResult) And(y FilterResult) FilterResult { | ||
| if x == NoFilter { | ||
| return y | ||
| } | ||
| if y == NoFilter { | ||
| return x | ||
| } | ||
| if x == PassFilter && y == PassFilter { | ||
| return PassFilter | ||
| } | ||
| return FailFilter | ||
| } | ||
|
|
||
| // Filter is an interface representing an event filter of the trigger filter | ||
| type Filter interface { | ||
| // Filter compute the predicate on the provided event and returns the result of the matching | ||
| Filter(ctx context.Context, event cloudevents.Event) FilterResult | ||
| } | ||
|
|
||
| // Filters is a wrapper that runs each filter and performs the and | ||
| type Filters []Filter | ||
|
|
||
| func (filters Filters) Filter(ctx context.Context, event cloudevents.Event) FilterResult { | ||
| res := NoFilter | ||
| for _, f := range filters { | ||
| res = res.And(f.Filter(ctx, event)) | ||
|
slinkydeveloper marked this conversation as resolved.
|
||
| // Short circuit to optimize it | ||
| if res == FailFilter { | ||
| return FailFilter | ||
| } | ||
| } | ||
| return res | ||
| } | ||
|
|
||
| var _ Filter = Filters{} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /* | ||
| Copyright 2020 The Knative Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package eventfilter | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strings" | ||
| "testing" | ||
|
|
||
| cloudevents "github.com/cloudevents/sdk-go/v2" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| type mockFilter FilterResult | ||
|
|
||
| func (p mockFilter) Filter(ctx context.Context, event cloudevents.Event) FilterResult { | ||
| return FilterResult(p) | ||
| } | ||
|
|
||
| func TestFilters(t *testing.T) { | ||
| tests := []struct { | ||
| have []FilterResult | ||
| want FilterResult | ||
|
slinkydeveloper marked this conversation as resolved.
|
||
| }{{ | ||
| have: []FilterResult{}, | ||
| want: NoFilter, | ||
| }, { | ||
| have: []FilterResult{PassFilter}, | ||
| want: PassFilter, | ||
| }, { | ||
| have: []FilterResult{FailFilter}, | ||
| want: FailFilter, | ||
| }, { | ||
| have: []FilterResult{PassFilter, PassFilter}, | ||
| want: PassFilter, | ||
| }, { | ||
| have: []FilterResult{PassFilter, FailFilter}, | ||
| want: FailFilter, | ||
| }, { | ||
| have: []FilterResult{FailFilter, FailFilter}, | ||
| want: FailFilter, | ||
| }} | ||
| for _, tt := range tests { | ||
| t.Run(testName(tt.have, tt.want), func(t *testing.T) { | ||
| var filters Filters | ||
| for _, fr := range tt.have { | ||
| filters = append(filters, mockFilter(fr)) | ||
| } | ||
| require.Equal(t, tt.want, filters.Filter(context.TODO(), cloudevents.Event{})) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func testName(res []FilterResult, want FilterResult) string { | ||
| if len(res) != 0 { | ||
| var operands []string | ||
| for _, r := range res { | ||
| operands = append(operands, fmt.Sprintf("'%s'", string(r))) | ||
| } | ||
| return strings.Join(operands, " and ") + " = '" + string(want) + "'" | ||
| } | ||
| return string(NoFilter) + " = '" + string(want) + "'" | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.