Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions pkg/eventfilter/attributes/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,8 @@ func NewAttributesFilter(attrs map[string]string) eventfilter.Filter {
}

func (attrs attributesFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult {
// Set standard context attributes. The attributes available may not be
// exactly the same as the attributes defined in the current version of the
// CloudEvents spec.
ce := map[string]interface{}{
"specversion": event.SpecVersion(),
"type": event.Type(),
"source": event.Source(),
"subject": event.Subject(),
"id": event.ID(),
"time": event.Time().String(),
"dataschema": event.DataSchema(),
"schemaurl": event.DataSchema(),
"datacontenttype": event.DataContentType(),
"datamediatype": event.DataMediaType(),
// TODO: use data_base64 when SDK supports it.
"datacontentencoding": event.DeprecatedDataContentEncoding(),
}
ext := event.Extensions()
for k, v := range ext {
ce[k] = v
}

for k, v := range attrs {
var value interface{}
value, ok := ce[k]
value, ok := lookup(event, k)
// If the attribute does not exist in the event (extension context attributes) or if the event attribute
// has an empty string value (optional attributes) - which means it was never set in the incoming event,
// return false.
Expand All @@ -76,4 +53,38 @@ func (attrs attributesFilter) Filter(ctx context.Context, event cloudevents.Even
return eventfilter.PassFilter
}

func lookup(event cloudevents.Event, attr string) (interface{}, bool) {
// Set standard context attributes. The attributes available may not be
// exactly the same as the attributes defined in the current version of the
// CloudEvents spec.
switch attr {
case "specversion":
return event.SpecVersion(), true
case "type":
return event.Type(), true
case "source":
return event.Source(), true
case "subject":
return event.Subject(), true
case "id":
return event.ID(), true
case "time":
return event.Time().String(), true
case "dataschema":
return event.DataSchema(), true
case "schemaurl":
return event.DataSchema(), true
case "datacontenttype":
return event.DataContentType(), true
case "datamediatype":
return event.DataMediaType(), true
case "datacontentencoding":
// TODO: use data_base64 when SDK supports it.
return event.DeprecatedDataContentEncoding(), true
default:
val, ok := event.Extensions()[attr]
return val, ok
}
}

var _ eventfilter.Filter = attributesFilter{}
38 changes: 38 additions & 0 deletions pkg/eventfilter/attributes/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,41 @@ func attributesWithSubject(t, s, sub string) map[string]string {
"subject": sub,
}
}

func TestAllSupportedAttributeFieldsV1(t *testing.T) {
e := cloudevents.NewEvent(cloudevents.VersionV1)
e.SetType(eventType)
e.SetSource(eventSource)
e.SetID("1234")
e.SetDataSchema("wow")
e.SetSubject("cool")
e.SetDataContentType("cheers;mate")

attributes := map[string]string{
"specversion": e.SpecVersion(),
"type": e.Type(),
"source": e.Source(),
"subject": e.Subject(),
"id": e.ID(),
"time": e.Time().String(),
"dataschema": e.DataSchema(),
"schemaurl": e.DataSchema(),
"datacontenttype": e.DataContentType(),
"datamediatype": e.DataMediaType(),
}
if result := NewAttributesFilter(attributes).Filter(context.TODO(), e); result != eventfilter.PassFilter {
t.Errorf("Expected pass, got %v", result)
}
}

func TestV03Event(t *testing.T) {
e := cloudevents.NewEvent(cloudevents.VersionV03)
e.SetDataContentEncoding("perfect")

attributes := map[string]string{
"datacontentencoding": e.DeprecatedDataContentEncoding(),
}
if result := NewAttributesFilter(attributes).Filter(context.TODO(), e); result != eventfilter.PassFilter {
t.Errorf("Expected pass, got %v", result)
}
}