Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/procfs v0.0.11 // indirect
github.com/qri-io/jsonschema v0.2.0
github.com/rickb777/date v1.13.0
github.com/robfig/cron/v3 v3.0.1
github.com/rogpeppe/fastuuid v1.2.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,10 @@ github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/statsd_exporter v0.15.0 h1:UiwC1L5HkxEPeapXdm2Ye0u1vUJfTj7uwT5yydYpa1E=
github.com/prometheus/statsd_exporter v0.15.0/go.mod h1:Dv8HnkoLQkeEjkIE4/2ndAA7WL1zHKK7WMqFQqu72rw=
github.com/qri-io/jsonpointer v0.1.1 h1:prVZBZLL6TW5vsSB9fFHFAMBLI4b0ri5vribQlTJiBA=
github.com/qri-io/jsonpointer v0.1.1/go.mod h1:DnJPaYgiKu56EuDp8TU5wFLdZIcAnb/uH9v37ZaMV64=
github.com/qri-io/jsonschema v0.2.0 h1:is8lirh3HYwTkC0e+4jL/vWEHwzPLojnl4FWkUoeEPU=
github.com/qri-io/jsonschema v0.2.0/go.mod h1:g7DPkiOsK1xv6T/Ao5scXRkd+yTFygcANPBaaqW+VrI=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rickb777/date v1.13.0 h1:+8AmwLuY1d/rldzdqvqTEg7107bZ8clW37x4nsdG3Hs=
github.com/rickb777/date v1.13.0/go.mod h1:GZf3LoGnxPWjX+/1TXOuzHefZFDovTyNLHDMd3qH70k=
Expand All @@ -579,6 +583,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down
141 changes: 141 additions & 0 deletions pkg/eventfilter/benchmarks/filter_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package benchmarks

import (
"testing"

cetest "github.com/cloudevents/sdk-go/v2/test"

"knative.dev/eventing/pkg/eventfilter"
"knative.dev/eventing/pkg/eventfilter/json_schema"
)

func BenchmarkJsonSchemaFilter(b *testing.B) {
event := cetest.FullEvent()

RunFilterBenchmarks(b,
func(i interface{}) eventfilter.Filter {
f, _ := json_schema.NewJsonSchemaFilter(i.(map[string]interface{}))
return f
},
FilterBenchmark{
name: "Pass with exact match of id",
arg: map[string]interface{}{
"properties": map[string]interface{}{
"id": map[string]interface{}{
"const": event.ID(),
},
},
},
event: event,
},
FilterBenchmark{
name: "Pass with exact match of all context attributes (except time)",
arg: map[string]interface{}{
"properties": map[string]interface{}{
"id": map[string]interface{}{
"const": event.ID(),
},
"source": map[string]interface{}{
"const": event.Source(),
},
"type": map[string]interface{}{
"const": event.Type(),
},
"dataschema": map[string]interface{}{
"const": event.DataSchema(),
},
"datacontenttype": map[string]interface{}{
"const": event.DataContentType(),
},
"subject": map[string]interface{}{
"const": event.Subject(),
},
},
},
event: event,
},
FilterBenchmark{
name: "No pass with exact match of id and source",
arg: map[string]interface{}{
"properties": map[string]interface{}{
"id": map[string]interface{}{
"const": "qwertyuiopasdfghjklzxcvbnm",
},
"source": map[string]interface{}{
"const": "qwertyuiopasdfghjklzxcvbnm",
},
},
},
event: event,
},
FilterBenchmark{
name: "No pass with if then",
arg: map[string]interface{}{
"if": map[string]interface{}{
"properties": map[string]interface{}{
"id": map[string]interface{}{
"const": event.ID(),
},
},
},
"then": map[string]interface{}{
"properties": map[string]interface{}{
"type": map[string]interface{}{
"const": "---" + event.Type(),
},
},
},
},
event: event,
},
FilterBenchmark{
name: "No pass with nested logic",
arg: map[string]interface{}{
"anyOf": []interface{}{
map[string]interface{}{
"properties": map[string]interface{}{
"type": map[string]interface{}{
"const": "---" + event.Type(),
},
},
},
map[string]interface{}{
"oneOf": []interface{}{
map[string]interface{}{
"properties": map[string]interface{}{
"type": map[string]interface{}{
"const": event.Type(),
},
},
},
map[string]interface{}{
"properties": map[string]interface{}{
"id": map[string]interface{}{
"const": event.ID(),
},
},
},
},
},
},
},
event: event,
},
)
}
129 changes: 129 additions & 0 deletions pkg/eventfilter/json_schema/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package json_schema

import (
"context"
"encoding/json"
"fmt"
"io"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/cloudevents/sdk-go/v2/types"
"github.com/qri-io/jsonschema"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/eventfilter"
)

type jsonSchemaFilter jsonschema.Schema

func NewJsonSchemaFilter(jsonSchema map[string]interface{}) (eventfilter.Filter, error) {
schemaBytes, err := json.Marshal(jsonSchema)
if err != nil {
return nil, fmt.Errorf("error while marshalling the schema: %w", err)
}

rs := &jsonschema.Schema{}
if err := json.Unmarshal(schemaBytes, rs); err != nil {
return nil, fmt.Errorf("error while parsing the schema: %w", err)
}

return (*jsonSchemaFilter)(rs), nil
}

func (j *jsonSchemaFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult {
// The nice thing here would be to have an abstraction that makes event looks like a json
// without triggering this copy.
// Unfortunately this library doesn't allow it, hence we need to copy stuff inside a map to
// give to the json schema validator
json := make(map[string]interface{}, 4) // A CloudEvent has at least 4 attributes

// We're leveraging the binding to efficiently copy-paste attributes and extensions into the json
// to give to the json schema library.
// It also guarantees no nil attributes (which might break the semantics of the schema, cloudevents spec never allows null values)
err := binding.ToMessage(&event).ReadBinary(ctx, attributesWriter(json))
if err != nil {
// This should never happen in theory
logging.FromContext(ctx).Warn("Error while trying to convert the input event attributes and extensions to json: ", err)
return eventfilter.FailFilter
}

result := (*jsonschema.Schema)(j).Validate(ctx, json)
if result.IsValid() {
return eventfilter.PassFilter
}
return eventfilter.FailFilter
}

var _ eventfilter.Filter = (*jsonSchemaFilter)(nil)

type attributesWriter map[string]interface{}

func (a attributesWriter) SetAttribute(attribute spec.Attribute, value interface{}) error {
v, err := coherceTypes(value)
if err != nil {
return err
}
a[attribute.Name()] = v
return nil
}

func (a attributesWriter) SetExtension(name string, value interface{}) error {
v, err := coherceTypes(value)
if err != nil {
return err
}
a[name] = v
return nil
}

func (a attributesWriter) Start(ctx context.Context) error {
return nil
}

func (a attributesWriter) SetData(data io.Reader) error {
return nil
}

func (a attributesWriter) End(ctx context.Context) error {
return nil
}

// Only some types has to be converted to strings
// The coherced types maps more or less to the official Cloudevents json schema
// https://github.com/cloudevents/spec/blob/master/spec.json
func coherceTypes(value interface{}) (interface{}, error) {
validatedValue, err := types.Validate(value)
if err != nil {
// This should never happen because an event cannot contain an invalid type!
return nil, err
}

switch t := validatedValue.(type) {
case types.URI: // Use string form of URLs.
return t.String(), nil
case types.URIRef: // Use string form of URLs.
return t.String(), nil
case types.Timestamp: // Use string form of URLs.
return t.String(), nil
default:
return validatedValue, nil
}
}
Loading