diff --git a/pkg/apis/eventing/v1alpha1/trigger_defaults.go b/pkg/apis/eventing/v1alpha1/trigger_defaults.go index 40f7c025473..e685f3c0295 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_defaults.go +++ b/pkg/apis/eventing/v1alpha1/trigger_defaults.go @@ -33,16 +33,4 @@ func (ts *TriggerSpec) SetDefaults(ctx context.Context) { if ts.Filter == nil { ts.Filter = &TriggerFilter{} } - - // Note that this logic will need to change once there are other filtering options, as it should - // only apply if no other filter is applied. - if ts.Filter.SourceAndType == nil { - ts.Filter.SourceAndType = &TriggerFilterSourceAndType{} - } - if ts.Filter.SourceAndType.Type == "" { - ts.Filter.SourceAndType.Type = TriggerAnyFilter - } - if ts.Filter.SourceAndType.Source == "" { - ts.Filter.SourceAndType.Source = TriggerAnyFilter - } } diff --git a/pkg/apis/eventing/v1alpha1/trigger_defaults_test.go b/pkg/apis/eventing/v1alpha1/trigger_defaults_test.go index c450a2c31bb..e6ab361ec85 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_defaults_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_defaults_test.go @@ -24,18 +24,19 @@ import ( ) var ( - defaultBroker = "default" - otherBroker = "other_broker" - - otherTriggerFilter = &TriggerFilter{ - SourceAndType: &TriggerFilterSourceAndType{ + defaultBroker = "default" + otherBroker = "other_broker" + defaultTriggerFilter = &TriggerFilter{} + otherTriggerFilter = &TriggerFilter{ + DeprecatedSourceAndType: &TriggerFilterSourceAndType{ Type: "other_type", Source: "other_source"}, } + defaultTrigger = Trigger{ Spec: TriggerSpec{ Broker: defaultBroker, - Filter: defaultTriggerFilter(), + Filter: defaultTriggerFilter, }, } ) @@ -51,7 +52,7 @@ func TestTriggerDefaults(t *testing.T) { }, "nil filter": { initial: Trigger{Spec: TriggerSpec{Broker: otherBroker}}, - expected: Trigger{Spec: TriggerSpec{Broker: otherBroker, Filter: defaultTriggerFilter()}}, + expected: Trigger{Spec: TriggerSpec{Broker: otherBroker, Filter: defaultTriggerFilter}}, }, "nil broker and nil filter": { initial: Trigger{}, @@ -67,12 +68,3 @@ func TestTriggerDefaults(t *testing.T) { }) } } - -func defaultTriggerFilter() *TriggerFilter { - // Can't just be a package level var because it gets mutated. - return &TriggerFilter{ - SourceAndType: &TriggerFilterSourceAndType{ - Type: TriggerAnyFilter, - Source: TriggerAnyFilter}, - } -} diff --git a/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go index b2678b6539c..58061f07761 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go @@ -300,14 +300,14 @@ func TestTriggerAnnotateUserInfo(t *testing.T) { }, { name: "update trigger which has no annotations without diff", user: u1, - this: &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter()}}, - prev: &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter()}}, + this: &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter}}, + prev: &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter}}, wantedAnns: map[string]string{}, }, { name: "update trigger which has annotations without diff", user: u2, - this: withUserAnns(u1, u1, &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter()}}), - prev: withUserAnns(u1, u1, &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter()}}), + this: withUserAnns(u1, u1, &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter}}), + prev: withUserAnns(u1, u1, &Trigger{Spec: TriggerSpec{Broker: defaultBroker, Filter: defaultTriggerFilter}}), wantedAnns: map[string]string{ eventing.CreatorAnnotation: u1, eventing.UpdaterAnnotation: u1, diff --git a/pkg/apis/eventing/v1alpha1/trigger_types.go b/pkg/apis/eventing/v1alpha1/trigger_types.go index f6d65aef690..92950946889 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_types.go +++ b/pkg/apis/eventing/v1alpha1/trigger_types.go @@ -74,17 +74,37 @@ type TriggerSpec struct { } type TriggerFilter struct { - SourceAndType *TriggerFilterSourceAndType `json:"sourceAndType,omitempty"` + // DeprecatedSourceAndType filters events based on exact matches on the + // CloudEvents type and source attributes. This field has been replaced by the + // Attributes field. + // + // +optional + DeprecatedSourceAndType *TriggerFilterSourceAndType `json:"sourceAndType,omitempty"` + + // Attributes filters events by exact match on event context attributes. + // Each key in the map is compared with the equivalent key in the event + // context. An event passes the filter if all values are equal to the + // specified values. + // + // Nested context attributes are not supported as keys. Only string values are supported. + // + // +optional + Attributes *TriggerFilterAttributes `json:"attributes,omitempty"` } // TriggerFilterSourceAndType filters events based on exact matches on the cloud event's type and // source attributes. Only exact matches will pass the filter. Either or both type and source can -// use the value 'Any' to indicate all strings match. +// use the value '' to indicate all strings match. type TriggerFilterSourceAndType struct { Type string `json:"type,omitempty"` Source string `json:"source,omitempty"` } +// TriggerFilterAttributes is a map of context attribute names to values for +// filtering by equality. Only exact matches will pass the filter. You can use the value '' +// to indicate all strings match. +type TriggerFilterAttributes map[string]string + // TriggerStatus represents the current state of a Trigger. type TriggerStatus struct { // inherits duck/v1beta1 Status, which currently provides: diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation.go b/pkg/apis/eventing/v1alpha1/trigger_validation.go index cdae0d296f3..befda828329 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation.go @@ -18,15 +18,24 @@ package v1alpha1 import ( "context" + "fmt" + "regexp" "knative.dev/pkg/apis" "knative.dev/pkg/kmp" ) +var ( + // Only allow lowercase alphanumeric, starting with letters. + validAttributeName = regexp.MustCompile(`^[a-z][a-z0-9]*$`) +) + +// Validate the Trigger. func (t *Trigger) Validate(ctx context.Context) *apis.FieldError { return t.Spec.Validate(ctx).ViaField("spec") } +// Validate the TriggerSpec. func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError { var errs *apis.FieldError if ts.Broker == "" { @@ -39,9 +48,39 @@ func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError { errs = errs.Also(fe) } - if ts.Filter != nil && ts.Filter.SourceAndType == nil { - fe := apis.ErrMissingField("filter.sourceAndType") - errs = errs.Also(fe) + if ts.Filter != nil { + filtersSpecified := make([]string, 0) + + if ts.Filter.DeprecatedSourceAndType != nil { + filtersSpecified = append(filtersSpecified, "filter.sourceAndType") + } + + if ts.Filter.Attributes != nil { + filtersSpecified = append(filtersSpecified, "filter.attributes") + if len(*ts.Filter.Attributes) == 0 { + fe := &apis.FieldError{ + Message: "At least one filtered attribute must be specified", + Paths: []string{"filter.attributes"}, + } + errs = errs.Also(fe) + } else { + attrs := map[string]string(*ts.Filter.Attributes) + for attr := range attrs { + if !validAttributeName.MatchString(attr) { + fe := &apis.FieldError{ + Message: fmt.Sprintf("Invalid attribute name: %s", attr), + Paths: []string{"filter.attributes"}, + } + errs = errs.Also(fe) + } + } + } + } + + if len(filtersSpecified) > 1 { + fe := apis.ErrMultipleOneOf(filtersSpecified...) + errs = errs.Also(fe) + } } if isSubscriberSpecNilOrEmpty(ts.Subscriber) { @@ -54,6 +93,7 @@ func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError { return errs } +// CheckImmutableFields checks that any immutable fields were not changed. func (t *Trigger) CheckImmutableFields(ctx context.Context, og apis.Immutable) *apis.FieldError { if og == nil { return nil diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go index 7217820eb3d..f5a476e38d5 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go @@ -27,10 +27,18 @@ import ( ) var ( - validTriggerFilter = &TriggerFilter{ - SourceAndType: &TriggerFilterSourceAndType{ + validEmptyFilter = &TriggerFilter{} + validSourceAndTypeFilter = &TriggerFilter{ + DeprecatedSourceAndType: &TriggerFilterSourceAndType{ Type: "other_type", - Source: "other_source"}, + Source: "other_source", + }, + } + validAttributesFilter = &TriggerFilter{ + Attributes: &TriggerFilterAttributes{ + "type": "other_type", + "source": "other_source", + }, } validSubscriber = &SubscriberSpec{ Ref: &corev1.ObjectReference{ @@ -80,7 +88,7 @@ func TestTriggerSpecValidation(t *testing.T) { name: "missing broker", ts: &TriggerSpec{ Broker: "", - Filter: validTriggerFilter, + Filter: validSourceAndTypeFilter, Subscriber: validSubscriber, }, want: func() *apis.FieldError { @@ -98,21 +106,72 @@ func TestTriggerSpecValidation(t *testing.T) { return fe }(), }, { - name: "missing filter.sourceAndType", + name: "missing attributes keys", ts: &TriggerSpec{ - Broker: "test_broker", - Filter: &TriggerFilter{}, + Broker: "test_broker", + Filter: &TriggerFilter{ + Attributes: &TriggerFilterAttributes{}, + }, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{ + Message: "At least one filtered attribute must be specified", + Paths: []string{"filter.attributes"}, + }, + }, { + name: "invalid attribute name, start with number", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: &TriggerFilter{ + Attributes: &TriggerFilterAttributes{ + "0invalid": "my-value", + }, + }, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{ + Message: "Invalid attribute name: 0invalid", + Paths: []string{"filter.attributes"}, + }, + }, { + name: "invalid attribute name, capital letters", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: &TriggerFilter{ + Attributes: &TriggerFilterAttributes{ + "invALID": "my-value", + }, + }, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{ + Message: "Invalid attribute name: invALID", + Paths: []string{"filter.attributes"}, + }, + }, { + name: "multiple oneof sourceAndType and attributes", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: &TriggerFilter{ + DeprecatedSourceAndType: &TriggerFilterSourceAndType{ + Type: "other_type", + Source: "other_source", + }, + Attributes: &TriggerFilterAttributes{ + "type": "other_type", + }, + }, Subscriber: validSubscriber, }, want: func() *apis.FieldError { - fe := apis.ErrMissingField("filter.sourceAndType") + fe := apis.ErrMultipleOneOf("filter.sourceAndType", "filter.attributes") return fe }(), }, { name: "missing subscriber", ts: &TriggerSpec{ Broker: "test_broker", - Filter: validTriggerFilter, + Filter: validSourceAndTypeFilter, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("subscriber") @@ -122,15 +181,49 @@ func TestTriggerSpecValidation(t *testing.T) { name: "missing subscriber.ref.name", ts: &TriggerSpec{ Broker: "test_broker", - Filter: validTriggerFilter, + Filter: validSourceAndTypeFilter, Subscriber: invalidSubscriber, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("subscriber.ref.name") return fe }(), - }, - } + }, { + name: "missing broker", + ts: &TriggerSpec{ + Broker: "", + Filter: validSourceAndTypeFilter, + Subscriber: validSubscriber, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("broker") + return fe + }(), + }, { + name: "valid empty filter", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: validEmptyFilter, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{}, + }, { + name: "valid SourceAndType filter", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: validSourceAndTypeFilter, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{}, + }, { + name: "valid Attributes filter", + ts: &TriggerSpec{ + Broker: "test_broker", + Filter: validAttributesFilter, + Subscriber: validSubscriber, + }, + want: &apis.FieldError{}, + }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -191,7 +284,7 @@ func TestTriggerImmutableFields(t *testing.T) { original: &Trigger{ Spec: TriggerSpec{ Broker: "broker", - Filter: validTriggerFilter, + Filter: validSourceAndTypeFilter, }, }, want: nil, diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 205bda4d156..c3800706469 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -650,11 +650,22 @@ func (in *Trigger) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TriggerFilter) DeepCopyInto(out *TriggerFilter) { *out = *in - if in.SourceAndType != nil { - in, out := &in.SourceAndType, &out.SourceAndType + if in.DeprecatedSourceAndType != nil { + in, out := &in.DeprecatedSourceAndType, &out.DeprecatedSourceAndType *out = new(TriggerFilterSourceAndType) **out = **in } + if in.Attributes != nil { + in, out := &in.Attributes, &out.Attributes + *out = new(TriggerFilterAttributes) + if **in != nil { + in, out := *in, *out + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + } return } @@ -668,6 +679,28 @@ func (in *TriggerFilter) DeepCopy() *TriggerFilter { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in TriggerFilterAttributes) DeepCopyInto(out *TriggerFilterAttributes) { + { + in := &in + *out = make(TriggerFilterAttributes, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + return + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TriggerFilterAttributes. +func (in TriggerFilterAttributes) DeepCopy() TriggerFilterAttributes { + if in == nil { + return nil + } + out := new(TriggerFilterAttributes) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TriggerFilterSourceAndType) DeepCopyInto(out *TriggerFilterSourceAndType) { *out = *in diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index 6066dfb946f..f425c959b23 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -20,11 +20,12 @@ import ( "context" "errors" "fmt" + "github.com/knative/eventing/pkg/logging" "net/http" "net/url" "time" - cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/reconciler/trigger/path" @@ -237,37 +238,84 @@ func (r *Receiver) getTrigger(ctx context.Context, ref path.NamespacedNameUID) ( } // shouldSendMessage determines whether message 'm' should be sent based on the triggerSpec 'ts'. -// Currently it supports exact matching on type and/or source of events. +// Currently it supports exact matching on event context attributes. +// If no filter is present, shouldSendMessage returns false. +// If no filter strategy is present, shouldSendMessage returns true. func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool { - if ts.Filter == nil || ts.Filter.SourceAndType == nil { + if ts.Filter == nil { r.logger.Error("No filter specified") ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail")) return false } - // Record event count and filtering time + // Record event count and filtering time. startTS := time.Now() defer func() { filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond) stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS)) }() - filterType := ts.Filter.SourceAndType.Type - if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() { - r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type())) - ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail")) - return false + // No filter specified, default to passing everything. + if ts.Filter.DeprecatedSourceAndType == nil && ts.Filter.Attributes == nil { + ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-pass")) + return true } - filterSource := ts.Filter.SourceAndType.Source - s := event.Context.AsV01().Source - actualSource := s.String() - if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource { - r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource)) + + attrs := map[string]string{} + // Since the filters cannot distinguish presence, filtering for an empty + // string is impossible. + if ts.Filter.DeprecatedSourceAndType != nil { + attrs["type"] = ts.Filter.DeprecatedSourceAndType.Type + attrs["source"] = ts.Filter.DeprecatedSourceAndType.Source + } else if ts.Filter.Attributes != nil { + attrs = map[string]string(*ts.Filter.Attributes) + } + + result := r.filterEventByAttributes(ctx, attrs, event) + if result { + ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass")) + } else { ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail")) + } + return result +} - return false +func (r *Receiver) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool { + // Set standard context attributes. The attributes available may not be + // exactly the same as the attributes defined in the current version of the + // CloudEvents spec. + ce := map[string]interface{}{ + "specversion": event.SpecVersion(), + "type": event.Type(), + "source": event.Source(), + "subject": event.Subject(), + "id": event.ID(), + "time": event.Time().String(), + "schemaurl": event.SchemaURL(), + "datacontenttype": event.DataContentType(), + "datamediatype": event.DataMediaType(), + "datacontentencoding": event.DataContentEncoding(), + } + ext := event.Extensions() + if ext != nil { + for k, v := range ext { + ce[k] = v + } } - ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass")) + for k, v := range attrs { + var value interface{} + value, ok := ce[k] + // If the attribute does not exist in the event, return false. + if !ok { + logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k)) + return false + } + // If the attribute is not set to any and is different than the one from the event, return false. + if v != eventingv1alpha1.TriggerAnyFilter && v != value { + logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value)) + return false + } + } return true } diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index 058f98dd268..2f4f685a649 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -41,11 +41,13 @@ import ( ) const ( - testNS = "test-namespace" - triggerName = "test-trigger" - triggerUID = "test-trigger-uid" - eventType = `com.example.someevent` - eventSource = `/mycontext` + testNS = "test-namespace" + triggerName = "test-trigger" + triggerUID = "test-trigger-uid" + eventType = `com.example.someevent` + eventSource = `/mycontext` + extensionName = `my-extension` + extensionValue = `my-extension-value` toBeReplaced = "toBeReplaced" ) @@ -147,23 +149,38 @@ func TestReceiver(t *testing.T) { }, "No TTL": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), }, event: makeEventWithoutTTL(), }, "Wrong type": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("some-other-type", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("some-other-type", "")), + }, + }, + "Wrong type with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("some-other-type", "")), }, }, "Wrong source": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", "some-other-source"), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "some-other-source")), + }, + }, + "Wrong source with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("", "some-other-source")), + }, + }, + "Wrong extension": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("", "some-other-source")), }, }, "Dispatch failed": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), }, requestFails: true, expectedErr: true, @@ -171,26 +188,51 @@ func TestReceiver(t *testing.T) { }, "Dispatch succeeded - Any": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), + }, + expectedDispatch: true, + }, + "Dispatch succeeded - Any with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("", "")), }, expectedDispatch: true, }, "Dispatch succeeded - Specific": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger(eventType, eventSource), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType(eventType, eventSource)), }, expectedDispatch: true, }, + "Dispatch succeeded - Specific with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes(eventType, eventSource)), + }, + expectedDispatch: true, + }, + "Dispatch succeeded - Extension with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributesAndExtension(eventType, eventSource, extensionValue)), + }, + event: makeEventWithExtension(), + expectedDispatch: true, + }, + "Dispatch failed - Extension with attribs": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributesAndExtension(eventType, eventSource, "some-other-extension-value")), + }, + event: makeEventWithExtension(), + }, "Returned Cloud Event": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), }, expectedDispatch: true, returnedEvent: makeDifferentEvent(), }, "Returned Cloud Event with custom headers": { triggers: []*eventingv1alpha1.Trigger{ - makeTrigger("", ""), + makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")), }, tctx: &cloudevents.HTTPTransportContext{ Method: "POST", @@ -363,7 +405,35 @@ func getClient(initial []runtime.Object, mocks controllertesting.Mocks) *control return controllertesting.NewMockClient(innerClient, mocks) } -func makeTrigger(t, s string) *eventingv1alpha1.Trigger { +func makeTriggerFilterWithDeprecatedSourceAndType(t, s string) *eventingv1alpha1.TriggerFilter { + return &eventingv1alpha1.TriggerFilter{ + DeprecatedSourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ + Type: t, + Source: s, + }, + } +} + +func makeTriggerFilterWithAttributes(t, s string) *eventingv1alpha1.TriggerFilter { + return &eventingv1alpha1.TriggerFilter{ + Attributes: &eventingv1alpha1.TriggerFilterAttributes{ + "type": t, + "source": s, + }, + } +} + +func makeTriggerFilterWithAttributesAndExtension(t, s, e string) *eventingv1alpha1.TriggerFilter { + return &eventingv1alpha1.TriggerFilter{ + Attributes: &eventingv1alpha1.TriggerFilterAttributes{ + "type": t, + "source": s, + extensionName: e, + }, + } +} + +func makeTrigger(filter *eventingv1alpha1.TriggerFilter) *eventingv1alpha1.Trigger { return &eventingv1alpha1.Trigger{ TypeMeta: v1.TypeMeta{ APIVersion: "eventing.knative.dev/v1alpha1", @@ -375,12 +445,7 @@ func makeTrigger(t, s string) *eventingv1alpha1.Trigger { UID: triggerUID, }, Spec: eventingv1alpha1.TriggerSpec{ - Filter: &eventingv1alpha1.TriggerFilter{ - SourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ - Type: t, - Source: s, - }, - }, + Filter: filter, }, Status: eventingv1alpha1.TriggerStatus{ SubscriberURI: "toBeReplaced", @@ -389,19 +454,19 @@ func makeTrigger(t, s string) *eventingv1alpha1.Trigger { } func makeTriggerWithoutFilter() *eventingv1alpha1.Trigger { - t := makeTrigger("", "") + t := makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")) t.Spec.Filter = nil return t } func makeTriggerWithoutSubscriberURI() *eventingv1alpha1.Trigger { - t := makeTrigger("", "") + t := makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")) t.Status = eventingv1alpha1.TriggerStatus{} return t } func makeTriggerWithBadSubscriberURI() *eventingv1alpha1.Trigger { - t := makeTrigger("", "") + t := makeTrigger(makeTriggerFilterWithDeprecatedSourceAndType("", "")) // This should fail url.Parse(). It was taken from the unit tests for url.Parse(), it violates // rfc3986 3.2.3, namely that the port must be digits. t.Status.SubscriberURI = "http://[::1]:namedport" @@ -446,3 +511,22 @@ func makeDifferentEvent() *cloudevents.Event { }.AsV03(), } } + +func makeEventWithExtension() *cloudevents.Event { + noTTL := &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: eventType, + Source: cloudevents.URLRef{ + URL: url.URL{ + Path: eventSource, + }, + }, + ContentType: cloudevents.StringOfApplicationJSON(), + Extensions: map[string]interface{}{ + extensionName: extensionValue, + }, + }.AsV03(), + } + e := addTTLToEvent(*noTTL) + return &e +} diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index 923ef1a5aec..baa0559416f 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -531,7 +531,7 @@ func makeTrigger() *v1alpha1.Trigger { Spec: v1alpha1.TriggerSpec{ Broker: brokerName, Filter: &v1alpha1.TriggerFilter{ - SourceAndType: &v1alpha1.TriggerFilterSourceAndType{ + DeprecatedSourceAndType: &v1alpha1.TriggerFilterSourceAndType{ Source: "Any", Type: "Any", }, diff --git a/test/base/resources/eventing.go b/test/base/resources/eventing.go index 5df6b03d63b..6e0b3d46953 100644 --- a/test/base/resources/eventing.go +++ b/test/base/resources/eventing.go @@ -109,7 +109,7 @@ func Broker(name string, options ...BrokerOption) *eventingv1alpha1.Broker { func WithTriggerFilter(eventSource, eventType string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { triggerFilter := &eventingv1alpha1.TriggerFilter{ - SourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ + DeprecatedSourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ Type: eventType, Source: eventSource, },