From b0d9d2db5f5a41da2aa99a766ab42c06200a5ede Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 5 Aug 2019 14:39:44 -0700 Subject: [PATCH 1/5] filter by attributes in triggers --- .../eventing/v1alpha1/trigger_defaults.go | 12 -- .../v1alpha1/trigger_defaults_test.go | 24 ++-- .../v1alpha1/trigger_lifecycle_test.go | 8 +- pkg/apis/eventing/v1alpha1/trigger_types.go | 20 ++- .../eventing/v1alpha1/trigger_validation.go | 28 +++- .../v1alpha1/trigger_validation_test.go | 89 ++++++++++-- .../v1alpha1/zz_generated.deepcopy.go | 37 ++++- pkg/broker/receiver.go | 83 ++++++++--- pkg/broker/receiver_test.go | 130 ++++++++++++++---- pkg/reconciler/trigger/trigger_test.go | 2 +- test/base/resources/eventing.go | 2 +- 11 files changed, 343 insertions(+), 92 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/trigger_defaults.go b/pkg/apis/eventing/v1alpha1/trigger_defaults.go index 40f7c025473..e685f3c0295 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_defaults.go +++ b/pkg/apis/eventing/v1alpha1/trigger_defaults.go @@ -33,16 +33,4 @@ func (ts *TriggerSpec) SetDefaults(ctx context.Context) { if ts.Filter == nil { ts.Filter = &TriggerFilter{} } - - // Note that this logic will need to change once there are other filtering options, as it should - // only apply if no other filter is applied. - if ts.Filter.SourceAndType == nil { - ts.Filter.SourceAndType = &TriggerFilterSourceAndType{} - } - if ts.Filter.SourceAndType.Type == "" { - ts.Filter.SourceAndType.Type = TriggerAnyFilter - } - if ts.Filter.SourceAndType.Source == "" { - ts.Filter.SourceAndType.Source = TriggerAnyFilter - } } diff --git a/pkg/apis/eventing/v1alpha1/trigger_defaults_test.go b/pkg/apis/eventing/v1alpha1/trigger_defaults_test.go index c450a2c31bb..e6ab361ec85 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_defaults_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_defaults_test.go @@ -24,18 +24,19 @@ import ( ) var ( - defaultBroker = "default" - otherBroker = "other_broker" - - otherTriggerFilter = &TriggerFilter{ - SourceAndType: &TriggerFilterSourceAndType{ + defaultBroker = "default" + otherBroker = "other_broker" + defaultTriggerFilter = &TriggerFilter{} + otherTriggerFilter = &TriggerFilter{ + DeprecatedSourceAndType: &TriggerFilterSourceAndType{ Type: "other_type", Source: "other_source"}, } + defaultTrigger = Trigger{ Spec: TriggerSpec{ Broker: defaultBroker, - Filter: defaultTriggerFilter(), + Filter: defaultTriggerFilter, }, } ) @@ -51,7 +52,7 @@ func TestTriggerDefaults(t *testing.T) { }, "nil filter": { initial: Trigger{Spec: TriggerSpec{Broker: otherBroker}}, - expected: Trigger{Spec: TriggerSpec{Broker: otherBroker, Filter: defaultTriggerFilter()}}, + expected: Trigger{Spec: TriggerSpec{Broker: otherBroker, Filter: defaultTriggerFilter}}, }, "nil broker and nil filter": { initial: Trigger{}, @@ -67,12 +68,3 @@ func TestTriggerDefaults(t *testing.T) { }) } } - -func defaultTriggerFilter() *TriggerFilter { - // Can't just be a package level var because it gets mutated. - return &TriggerFilter{ - SourceAndType: &TriggerFilterSourceAndType{ - Type: TriggerAnyFilter, - Source: TriggerAnyFilter}, - } -} diff --git a/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go index b2678b6539c..58061f07761 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go @@ -300,14 +300,14 @@ func TestTriggerAnnotateUserInfo(t *testing.T) { }, { name: "update trigger which has no annotations without diff", user: u1, - this: &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter()}}, - prev: &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter()}}, + this: &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter}}, + prev: &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter}}, wantedAnns: map[string]string{}, }, { name: "update trigger which has annotations without diff", user: u2, - this: withUserAnns(u1, u1, &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter()}}), - prev: withUserAnns(u1, u1, &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter()}}), + this: withUserAnns(u1, u1, &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter}}), + prev: withUserAnns(u1, u1, &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter}}), wantedAnns: map[string]string{ eventing.CreatorAnnotation: u1, eventing.UpdaterAnnotation: u1, diff --git a/pkg/apis/eventing/v1alpha1/trigger_types.go b/pkg/apis/eventing/v1alpha1/trigger_types.go index f6d65aef690..5856dd7f0cb 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_types.go +++ b/pkg/apis/eventing/v1alpha1/trigger_types.go @@ -74,7 +74,21 @@ type TriggerSpec struct { } type TriggerFilter struct { - SourceAndType *TriggerFilterSourceAndType `json:"sourceAndType,omitempty"` + // DeprecatedSourceAndType filters events based on exact matches on the + // CloudEvents type and source attributes. This field has been replaced by the + // Attributes field. + // + // +optional + DeprecatedSourceAndType *TriggerFilterSourceAndType `json:"sourceAndType,omitempty"` + + // Attributes filters events by exact match on event context attributes. + // Each key in the map is compared with the equivalent key in the event + // context. An event passes the filter if all values are equal to the + // specified values. + // + // Nested context attributes are not supported as keys. Numeric values are + // not supported. + Attributes *TriggerFilterAttributes `json:"attributes,omitempty"` } // TriggerFilterSourceAndType filters events based on exact matches on the cloud event's type and @@ -85,6 +99,10 @@ type TriggerFilterSourceAndType struct { Source string `json:"source,omitempty"` } +// TriggerFilterAttributes is a map of context attribute names to values for +// filtering by equality. +type TriggerFilterAttributes map[string]string + // TriggerStatus represents the current state of a Trigger. type TriggerStatus struct { // inherits duck/v1beta1 Status, which currently provides: diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation.go b/pkg/apis/eventing/v1alpha1/trigger_validation.go index cdae0d296f3..939b8010ceb 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation.go @@ -23,10 +23,12 @@ import ( "knative.dev/pkg/kmp" ) +// Validate the Trigger. func (t *Trigger) Validate(ctx context.Context) *apis.FieldError { return t.Spec.Validate(ctx).ViaField("spec") } +// Validate the TriggerSpec. func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError { var errs *apis.FieldError if ts.Broker == "" { @@ -39,9 +41,28 @@ func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError { errs = errs.Also(fe) } - if ts.Filter != nil && ts.Filter.SourceAndType == nil { - fe := apis.ErrMissingField("filter.sourceAndType") - errs = errs.Also(fe) + if ts.Filter != nil { + filtersSpecified := make([]string, 0) + + if ts.Filter.DeprecatedSourceAndType != nil { + filtersSpecified = append(filtersSpecified, "filter.sourceAndType") + } + + if ts.Filter.Attributes != nil { + filtersSpecified = append(filtersSpecified, "filter.attributes") + if len(*ts.Filter.Attributes) == 0 { + fe := &apis.FieldError{ + Message: "At least one filtered attribute must be specified", + Paths: []string{"filter.attributes"}, + } + errs = errs.Also(fe) + } + } + + if len(filtersSpecified) > 1 { + fe := apis.ErrMultipleOneOf(filtersSpecified...) + errs = errs.Also(fe) + } } if isSubscriberSpecNilOrEmpty(ts.Subscriber) { @@ -54,6 +75,7 @@ func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError { return errs } +// CheckImmutableFields checks that any immutable fields were not changed. func (t *Trigger) CheckImmutableFields(ctx context.Context, og apis.Immutable) *apis.FieldError { if og == nil { return nil diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go index 7217820eb3d..b56ff898039 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go @@ -27,10 +27,18 @@ import ( ) var ( - validTriggerFilter = &TriggerFilter{ - SourceAndType: &TriggerFilterSourceAndType{ + validEmptyFilter = &TriggerFilter{} + validSourceAndTypeFilter = &TriggerFilter{ + DeprecatedSourceAndType: &TriggerFilterSourceAndType{ Type: "other_type", - Source: "other_source"}, + Source: "other_source", + }, + } + validAttributesFilter = &TriggerFilter{ + Attributes: &TriggerFilterAttributes{ + "type": "other_type", + "source": "other_source", + }, } validSubscriber = &SubscriberSpec{ Ref: &corev1.ObjectReference{ @@ -80,7 +88,7 @@ func TestTriggerSpecValidation(t *testing.T) { name: "missing broker", ts: &TriggerSpec{ Broker: "", - Filter: validTriggerFilter, + Filter: validSourceAndTypeFilter, Subscriber: validSubscriber, }, want: func() *apis.FieldError { @@ -98,21 +106,42 @@ func TestTriggerSpecValidation(t *testing.T) { return fe }(), }, { - name: "missing filter.sourceAndType", + name: "missing attributes keys", ts: &TriggerSpec{ - Broker: "test_broker", - Filter: &TriggerFilter{}, + Broker: "test_broker", + Filter: &TriggerFilter{ + Attributes: &TriggerFilterAttributes{}, + }, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{ + Message: "At least one filtered attribute must be specified", + Paths: []string{"filter.attributes"}, + }, + }, { + name: "multiple oneof sourceAndType and attributes", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: &TriggerFilter{ + DeprecatedSourceAndType: &TriggerFilterSourceAndType{ + Type: "other_type", + Source: "other_source", + }, + Attributes: &TriggerFilterAttributes{ + "type": "other_type", + }, + }, Subscriber: validSubscriber, }, want: func() *apis.FieldError { - fe := apis.ErrMissingField("filter.sourceAndType") + fe := apis.ErrMultipleOneOf("filter.sourceAndType", "filter.attributes") return fe }(), }, { name: "missing subscriber", ts: &TriggerSpec{ Broker: "test_broker", - Filter: validTriggerFilter, + Filter: validSourceAndTypeFilter, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("subscriber") @@ -122,15 +151,49 @@ func TestTriggerSpecValidation(t *testing.T) { name: "missing subscriber.ref.name", ts: &TriggerSpec{ Broker: "test_broker", - Filter: validTriggerFilter, + Filter: validSourceAndTypeFilter, Subscriber: invalidSubscriber, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("subscriber.ref.name") return fe }(), - }, - } + }, { + name: "missing broker", + ts: &TriggerSpec{ + Broker: "", + Filter: validSourceAndTypeFilter, + Subscriber: validSubscriber, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("broker") + return fe + }(), + }, { + name: "valid empty filter", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: validEmptyFilter, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{}, + }, { + name: "valid SourceAndType filter", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: validSourceAndTypeFilter, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{}, + }, { + name: "valid Attributes filter", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: validAttributesFilter, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{}, + }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -191,7 +254,7 @@ func TestTriggerImmutableFields(t *testing.T) { original: &Trigger{ Spec: TriggerSpec{ Broker: "broker", - Filter: validTriggerFilter, + Filter: validSourceAndTypeFilter, }, }, want: nil, diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 205bda4d156..c3800706469 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -650,11 +650,22 @@ func (in *Trigger) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TriggerFilter) DeepCopyInto(out *TriggerFilter) { *out = *in - if in.SourceAndType != nil { - in, out := &in.SourceAndType, &out.SourceAndType + if in.DeprecatedSourceAndType != nil { + in, out := &in.DeprecatedSourceAndType, &out.DeprecatedSourceAndType *out = new(TriggerFilterSourceAndType) **out = **in } + if in.Attributes != nil { + in, out := &in.Attributes, &out.Attributes + *out = new(TriggerFilterAttributes) + if **in != nil { + in, out := *in, *out + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + } return } @@ -668,6 +679,28 @@ func (in *TriggerFilter) DeepCopy() *TriggerFilter { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in TriggerFilterAttributes) DeepCopyInto(out *TriggerFilterAttributes) { + { + in := &in + *out = make(TriggerFilterAttributes, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + return + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TriggerFilterAttributes. +func (in TriggerFilterAttributes) DeepCopy() TriggerFilterAttributes { + if in == nil { + return nil + } + out := new(TriggerFilterAttributes) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TriggerFilterSourceAndType) DeepCopyInto(out *TriggerFilterSourceAndType) { *out = *in diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index 6066dfb946f..fc0712777fe 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -24,7 +24,7 @@ import ( "net/url" "time" - cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/reconciler/trigger/path" @@ -237,37 +237,88 @@ func (r *Receiver) getTrigger(ctx context.Context, ref path.NamespacedNameUID) ( } // shouldSendMessage determines whether message 'm' should be sent based on the triggerSpec 'ts'. -// Currently it supports exact matching on type and/or source of events. +// Currently it supports exact matching on event context attributes. +// If no filter strategy is present, shouldSendMessage returns true. func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool { - if ts.Filter == nil || ts.Filter.SourceAndType == nil { + if ts.Filter == nil { r.logger.Error("No filter specified") ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail")) return false } - // Record event count and filtering time + // Record event count and filtering time. startTS := time.Now() defer func() { filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond) stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS)) }() - filterType := ts.Filter.SourceAndType.Type - if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() { - r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type())) - ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail")) - return false + // No filter specified, default to passing everything. + if ts.Filter.DeprecatedSourceAndType == nil && ts.Filter.Attributes == nil { + ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-pass")) + return true } - filterSource := ts.Filter.SourceAndType.Source - s := event.Context.AsV01().Source - actualSource := s.String() - if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource { - r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource)) + + attrs := map[string]string{} + if ts.Filter.DeprecatedSourceAndType != nil { + // Since this filter cannot distinguish presence, filtering for an empty + // string is impossible. + if ts.Filter.DeprecatedSourceAndType.Type != "" { + attrs["type"] = ts.Filter.DeprecatedSourceAndType.Type + } + if ts.Filter.DeprecatedSourceAndType.Source != "" { + attrs["source"] = ts.Filter.DeprecatedSourceAndType.Source + } + } else if ts.Filter.Attributes != nil { + attrs = map[string]string(*ts.Filter.Attributes) + } + + result := r.filterEventByAttributes(attrs, event) + if result { + ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass")) + } else { ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail")) + } + return result +} - return false +// filterEventByAttributes +func (r *Receiver) filterEventByAttributes(attrs map[string]string, event *cloudevents.Event) bool { + // Set baseline context attributes. The attributes available may not be + // exactly the same as the attributes defined in the current version of the + // CloudEvents spec. + ce := map[string]interface{}{ + "specversion": event.SpecVersion(), + "type": event.Type(), + "source": event.Source(), + "subject": event.Subject(), + "id": event.ID(), + "time": event.Time().String(), + "schemaurl": event.SchemaURL(), + "datacontenttype": event.DataContentType(), + "datamediatype": event.DataMediaType(), + "datacontentencoding": event.DataContentEncoding(), + } + ext := event.Extensions() + if ext != nil { + for k, v := range ext { + ce[k] = v + } } - ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass")) + for k, v := range attrs { + var value interface{} + value, ok := ce[k] + // If the attribute does not exist in the event, return false. + if !ok { + r.logger.Debug("No attribute", zap.String("attribute", k)) + return false + } + // If the attribute is not set to any and is different than the one from the event, return false. + if v != eventingv1alpha1.TriggerAnyFilter && v != value { + r.logger.Debug("Wrong attribute", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) + return false + } + } return true } diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index 058f98dd268..2f4f685a649 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -41,11 +41,13 @@ import ( ) const ( - testNS = "test-namespace" - triggerName = "test-trigger" - triggerUID = "test-trigger-uid" - eventType = `com.example.someevent` - eventSource = `/mycontext` + testNS = "test-namespace" + triggerName = "test-trigger" + triggerUID = "test-trigger-uid" + eventType = `com.example.someevent` + eventSource = `/mycontext` + extensionName = `my-extension` + extensionValue = `my-extension-value` toBeReplaced = "toBeReplaced" ) @@ -147,23 +149,38 @@ func TestReceiver(t *testing.T) { }, "No TTL": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), }, event: makeEventWithoutTTL(), }, "Wrong type": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("some-other-type", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("some-other-type", "")), + }, + }, + "Wrong type with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("some-other-type", "")), }, }, "Wrong source": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", "some-other-source"), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "some-other-source")), + }, + }, + "Wrong source with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("", "some-other-source")), + }, + }, + "Wrong extension": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("", "some-other-source")), }, }, "Dispatch failed": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), }, requestFails: true, expectedErr: true, @@ -171,26 +188,51 @@ func TestReceiver(t *testing.T) { }, "Dispatch succeeded - Any": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), + }, + expectedDispatch: true, + }, + "Dispatch succeeded - Any with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("", "")), }, expectedDispatch: true, }, "Dispatch succeeded - Specific": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger(eventType, eventSource), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType(eventType, eventSource)), }, expectedDispatch: true, }, + "Dispatch succeeded - Specific with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes(eventType, eventSource)), + }, + expectedDispatch: true, + }, + "Dispatch succeeded - Extension with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributesAndExtension(eventType, eventSource, extensionValue)), + }, + event: makeEventWithExtension(), + expectedDispatch: true, + }, + "Dispatch failed - Extension with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributesAndExtension(eventType, eventSource, "some-other-extension-value")), + }, + event: makeEventWithExtension(), + }, "Returned Cloud Event": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), }, expectedDispatch: true, returnedEvent: makeDifferentEvent(), }, "Returned Cloud Event with custom headers": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), }, tctx: &cloudevents.HTTPTransportContext{ Method: "POST", @@ -363,7 +405,35 @@ func getClient(initial []runtime.Object, mocks controllertesting.Mocks) *control return controllertesting.NewMockClient(innerClient, mocks) } -func makeTrigger(t, s string) *eventingv1alpha1.Trigger { +func makeTriggerFilterWithDeprecatedSourceAndType(t, s string) *eventingv1alpha1.TriggerFilter { + return &eventingv1alpha1.TriggerFilter{ + DeprecatedSourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ + Type: t, + Source: s, + }, + } +} + +func makeTriggerFilterWithAttributes(t, s string) *eventingv1alpha1.TriggerFilter { + return &eventingv1alpha1.TriggerFilter{ + Attributes: &eventingv1alpha1.TriggerFilterAttributes{ + "type": t, + "source": s, + }, + } +} + +func makeTriggerFilterWithAttributesAndExtension(t, s, e string) *eventingv1alpha1.TriggerFilter { + return &eventingv1alpha1.TriggerFilter{ + Attributes: &eventingv1alpha1.TriggerFilterAttributes{ + "type": t, + "source": s, + extensionName: e, + }, + } +} + +func makeTrigger(filter *eventingv1alpha1.TriggerFilter) *eventingv1alpha1.Trigger { return &eventingv1alpha1.Trigger{ TypeMeta: v1.TypeMeta{ APIVersion: "eventing.knative.dev/v1alpha1", @@ -375,12 +445,7 @@ func makeTrigger(t, s string) *eventingv1alpha1.Trigger { UID: triggerUID, }, Spec: eventingv1alpha1.TriggerSpec{ - Filter: &eventingv1alpha1.TriggerFilter{ - SourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ - Type: t, - Source: s, - }, - }, + Filter: filter, }, Status: eventingv1alpha1.TriggerStatus{ SubscriberURI: "toBeReplaced", @@ -389,19 +454,19 @@ func makeTrigger(t, s string) *eventingv1alpha1.Trigger { } func makeTriggerWithoutFilter() *eventingv1alpha1.Trigger { - t := makeTrigger("", "") + t := makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")) t.Spec.Filter = nil return t } func makeTriggerWithoutSubscriberURI() *eventingv1alpha1.Trigger { - t := makeTrigger("", "") + t := makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")) t.Status = eventingv1alpha1.TriggerStatus{} return t } func makeTriggerWithBadSubscriberURI() *eventingv1alpha1.Trigger { - t := makeTrigger("", "") + t := makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")) // This should fail url.Parse(). It was taken from the unit tests for url.Parse(), it violates // rfc3986 3.2.3, namely that the port must be digits. t.Status.SubscriberURI = "http://[::1]:namedport" @@ -446,3 +511,22 @@ func makeDifferentEvent() *cloudevents.Event { }.AsV03(), } } + +func makeEventWithExtension() *cloudevents.Event { + noTTL := &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: eventType, + Source: cloudevents.URLRef{ + URL: url.URL{ + Path: eventSource, + }, + }, + ContentType: cloudevents.StringOfApplicationJSON(), + Extensions: map[string]interface{}{ + extensionName: extensionValue, + }, + }.AsV03(), + } + e := addTTLToEvent(*noTTL) + return &e +} diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index 923ef1a5aec..baa0559416f 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -531,7 +531,7 @@ func makeTrigger() *v1alpha1.Trigger { Spec: v1alpha1.TriggerSpec{ Broker: brokerName, Filter: &v1alpha1.TriggerFilter{ - SourceAndType: &v1alpha1.TriggerFilterSourceAndType{ + DeprecatedSourceAndType: &v1alpha1.TriggerFilterSourceAndType{ Source: "Any", Type: "Any", }, diff --git a/test/base/resources/eventing.go b/test/base/resources/eventing.go index 5df6b03d63b..6e0b3d46953 100644 --- a/test/base/resources/eventing.go +++ b/test/base/resources/eventing.go @@ -109,7 +109,7 @@ func Broker(name string, options ...BrokerOption) *eventingv1alpha1.Broker { func WithTriggerFilter(eventSource, eventType string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { triggerFilter := &eventingv1alpha1.TriggerFilter{ - SourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ + DeprecatedSourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ Type: eventType, Source: eventSource, }, From b94c0d382ff563c46e5f97ba5503794ecedfef41 Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 5 Aug 2019 14:54:56 -0700 Subject: [PATCH 2/5] cosmetics --- pkg/apis/eventing/v1alpha1/trigger_types.go | 2 ++ pkg/broker/receiver.go | 14 +++++--------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/trigger_types.go b/pkg/apis/eventing/v1alpha1/trigger_types.go index 5856dd7f0cb..7f5f21c8a54 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_types.go +++ b/pkg/apis/eventing/v1alpha1/trigger_types.go @@ -88,6 +88,8 @@ type TriggerFilter struct { // // Nested context attributes are not supported as keys. Numeric values are // not supported. + // + // +optional Attributes *TriggerFilterAttributes `json:"attributes,omitempty"` } diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index fc0712777fe..99cf9630aa1 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -260,15 +260,11 @@ func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.T } attrs := map[string]string{} + // Since the filters cannot distinguish presence, filtering for an empty + // string is impossible. if ts.Filter.DeprecatedSourceAndType != nil { - // Since this filter cannot distinguish presence, filtering for an empty - // string is impossible. - if ts.Filter.DeprecatedSourceAndType.Type != "" { - attrs["type"] = ts.Filter.DeprecatedSourceAndType.Type - } - if ts.Filter.DeprecatedSourceAndType.Source != "" { - attrs["source"] = ts.Filter.DeprecatedSourceAndType.Source - } + attrs["type"] = ts.Filter.DeprecatedSourceAndType.Type + attrs["source"] = ts.Filter.DeprecatedSourceAndType.Source } else if ts.Filter.Attributes != nil { attrs = map[string]string(*ts.Filter.Attributes) } @@ -284,7 +280,7 @@ func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.T // filterEventByAttributes func (r *Receiver) filterEventByAttributes(attrs map[string]string, event *cloudevents.Event) bool { - // Set baseline context attributes. The attributes available may not be + // Set standard context attributes. The attributes available may not be // exactly the same as the attributes defined in the current version of the // CloudEvents spec. ce := map[string]interface{}{ From 1d212bf7d4faa8f79cb5bc0da55933558f1f291d Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 5 Aug 2019 14:56:45 -0700 Subject: [PATCH 3/5] update comment --- pkg/apis/eventing/v1alpha1/trigger_types.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/trigger_types.go b/pkg/apis/eventing/v1alpha1/trigger_types.go index 7f5f21c8a54..a4331bcd5be 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_types.go +++ b/pkg/apis/eventing/v1alpha1/trigger_types.go @@ -95,14 +95,15 @@ type TriggerFilter struct { // TriggerFilterSourceAndType filters events based on exact matches on the cloud event's type and // source attributes. Only exact matches will pass the filter. Either or both type and source can -// use the value 'Any' to indicate all strings match. +// use the value '' to indicate all strings match. type TriggerFilterSourceAndType struct { Type string `json:"type,omitempty"` Source string `json:"source,omitempty"` } // TriggerFilterAttributes is a map of context attribute names to values for -// filtering by equality. +// filtering by equality. Only exact matches will pass the filter. You can use the value '' +// to indicate all strings match. type TriggerFilterAttributes map[string]string // TriggerStatus represents the current state of a Trigger. From e47e4e44719916f0da0c89fac77c78a682523bf9 Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 5 Aug 2019 16:16:26 -0700 Subject: [PATCH 4/5] updates after review --- pkg/apis/eventing/v1alpha1/trigger_types.go | 3 +-- .../eventing/v1alpha1/trigger_validation.go | 18 ++++++++++++++++++ .../v1alpha1/trigger_validation_test.go | 15 +++++++++++++++ pkg/broker/receiver.go | 11 ++++++----- 4 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/trigger_types.go b/pkg/apis/eventing/v1alpha1/trigger_types.go index a4331bcd5be..92950946889 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_types.go +++ b/pkg/apis/eventing/v1alpha1/trigger_types.go @@ -86,8 +86,7 @@ type TriggerFilter struct { // context. An event passes the filter if all values are equal to the // specified values. // - // Nested context attributes are not supported as keys. Numeric values are - // not supported. + // Nested context attributes are not supported as keys. Only string values are supported. // // +optional Attributes *TriggerFilterAttributes `json:"attributes,omitempty"` diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation.go b/pkg/apis/eventing/v1alpha1/trigger_validation.go index 939b8010ceb..40a8ea6d240 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation.go @@ -18,11 +18,18 @@ package v1alpha1 import ( "context" + "fmt" + "regexp" "knative.dev/pkg/apis" "knative.dev/pkg/kmp" ) +var ( + // Only allow alphanumeric, starting with letters. + validAttributeName = regexp.MustCompile(`^[a-z][a-z0-9]*$`) +) + // Validate the Trigger. func (t *Trigger) Validate(ctx context.Context) *apis.FieldError { return t.Spec.Validate(ctx).ViaField("spec") @@ -56,6 +63,17 @@ func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError { Paths: []string{"filter.attributes"}, } errs = errs.Also(fe) + } else { + attrs := map[string]string(*ts.Filter.Attributes) + for attr := range attrs { + if !validAttributeName.MatchString(attr) { + fe := &apis.FieldError{ + Message: fmt.Sprintf("Invalid attribute name: %s", attr), + Paths: []string{"filter.attributes"}, + } + errs = errs.Also(fe) + } + } } } diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go index b56ff898039..870e6c1eb04 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go @@ -118,6 +118,21 @@ func TestTriggerSpecValidation(t *testing.T) { Message: "At least one filtered attribute must be specified", Paths: []string{"filter.attributes"}, }, + }, { + name: "invalid attribute name", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: &TriggerFilter{ + Attributes: &TriggerFilterAttributes{ + "0invalid": "my-value", + }, + }, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{ + Message: "Invalid attribute name: 0invalid", + Paths: []string{"filter.attributes"}, + }, }, { name: "multiple oneof sourceAndType and attributes", ts: &TriggerSpec{ diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index 99cf9630aa1..f425c959b23 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "github.com/knative/eventing/pkg/logging" "net/http" "net/url" "time" @@ -238,6 +239,7 @@ func (r *Receiver) getTrigger(ctx context.Context, ref path.NamespacedNameUID) ( // shouldSendMessage determines whether message 'm' should be sent based on the triggerSpec 'ts'. // Currently it supports exact matching on event context attributes. +// If no filter is present, shouldSendMessage returns false. // If no filter strategy is present, shouldSendMessage returns true. func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool { if ts.Filter == nil { @@ -269,7 +271,7 @@ func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.T attrs = map[string]string(*ts.Filter.Attributes) } - result := r.filterEventByAttributes(attrs, event) + result := r.filterEventByAttributes(ctx, attrs, event) if result { ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass")) } else { @@ -278,8 +280,7 @@ func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.T return result } -// filterEventByAttributes -func (r *Receiver) filterEventByAttributes(attrs map[string]string, event *cloudevents.Event) bool { +func (r *Receiver) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool { // Set standard context attributes. The attributes available may not be // exactly the same as the attributes defined in the current version of the // CloudEvents spec. @@ -307,12 +308,12 @@ func (r *Receiver) filterEventByAttributes(attrs map[string]string, event *cloud value, ok := ce[k] // If the attribute does not exist in the event, return false. if !ok { - r.logger.Debug("No attribute", zap.String("attribute", k)) + logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) return false } // If the attribute is not set to any and is different than the one from the event, return false. if v != eventingv1alpha1.TriggerAnyFilter && v != value { - r.logger.Debug("Wrong attribute", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) + logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) return false } } From 46501e03e84bc055c2a1c940114817f24a7bfe1d Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 5 Aug 2019 16:25:07 -0700 Subject: [PATCH 5/5] new UT --- .../eventing/v1alpha1/trigger_validation.go | 2 +- .../v1alpha1/trigger_validation_test.go | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation.go b/pkg/apis/eventing/v1alpha1/trigger_validation.go index 40a8ea6d240..befda828329 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation.go @@ -26,7 +26,7 @@ import ( ) var ( - // Only allow alphanumeric, starting with letters. + // Only allow lowercase alphanumeric, starting with letters. validAttributeName = regexp.MustCompile(`^[a-z][a-z0-9]*$`) ) diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go index 870e6c1eb04..f5a476e38d5 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go @@ -119,7 +119,7 @@ func TestTriggerSpecValidation(t *testing.T) { Paths: []string{"filter.attributes"}, }, }, { - name: "invalid attribute name", + name: "invalid attribute name, start with number", ts: &TriggerSpec{ Broker: "test_broker", Filter: &TriggerFilter{ @@ -133,6 +133,21 @@ func TestTriggerSpecValidation(t *testing.T) { Message: "Invalid attribute name: 0invalid", Paths: []string{"filter.attributes"}, }, + }, { + name: "invalid attribute name, capital letters", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: &TriggerFilter{ + Attributes: &TriggerFilterAttributes{ + "invALID": "my-value", + }, + }, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{ + Message: "Invalid attribute name: invALID", + Paths: []string{"filter.attributes"}, + }, }, { name: "multiple oneof sourceAndType and attributes", ts: &TriggerSpec{