Cel filters -- Review#2
Conversation
Triggers with the Filter.CELExpression field populated use CEL to filter events. The CEL expression has access to a `ce` object with the following fields: - `specversion` - `type` - `source` This list of fields is defined by the dev.knative.CloudEventFilterMeta proto in pkg/broker/dev_knative. The following CEL expression selects events with type `com.example`: `ce.type == "com.example"`
Changes the CEL filter to be expressed this way:
filter:
cel:
expression: foo.bar = "baz"
Using a protobuf Struct eliminates the need to generate pb structs for CloudEvent fields, and lays the groundwork for adding dynamic fields to the expression environment.
If the event has datacontenttype "application/json", the CEL environment will have the parsed json data in the data field.
These fields must be true to enable dynamic parsing of event extensions and data respectively in the CEL environment. If parsing fails, the standard CE fields will still be available for filtering.
Also refactor event context parsing to be more efficient and readable.
Triggers and Event fixtures are now generated by builders in an attempt to make the content of the fixture clearer and more flexible.
The CloudEvents SDK seems unable to return a single object with both the standard fields and the extension fields. Until that's possible, the parsed extensions will be prefixed with `ext.` in the CEL expression. The parsed JSON data will be prefixed with `data.` in the CEL expression. The standard fields are currently un-prefixed, but that will probably need to change because CEL reserves `type`. Currently the CloudEvents type is called `typ` as a temporary workaround. Tests now verify that basic expressions on type, source, parsed extensions, and parsed data filter correctly.
`type` is problematic in CEL because it's a reserved word, and in general it's probably a bad idea to risk collisions with top-level reserved words. This change nests all CE keys under `ce.` to avoid the `type` collision and all future collisions. A proto is defined for these keys to avoid issues with dynamic typing and make the definition more shareable. Eventually the ce and ext prefixes could be combined into a single prefix for future proofing of extensions and simpler user experience.
Harwayne
left a comment
There was a problem hiding this comment.
Webhook code will also have to change. My recommendation is to allow at most once filter type to be specified. If nothing is specified, then either default in sourceAndType or leave it empty and fail open.
| // +optional | ||
| ParseExtensions bool `json:"parseExtensions"` | ||
| // ParseData enables parsing of the event data and makes the parsed data | ||
| // available in the CEL environment. Currently this is only available for |
There was a problem hiding this comment.
Do we think that this will work transparently for other data formats? For example, if the data is XML, will this boolean be sufficient?
Or should we make this an enum, with the current values NONE and JSON, defaulting to NONE.
There was a problem hiding this comment.
The data content type and content encoding are transmitted with the event, so the SDK has everything needed at runtime to decide whether the data is parseable (cc @n3wscott to confirm). I believe a boolean is sufficient here to allow parsing of both JSON and XML events, but I'm open to alternate reasoning.
| //TODO should this coerce to V02? | ||
| extStruct, err := ceParsedExtensionsStruct(event.Context.AsV02().Extensions) | ||
| if err != nil { | ||
| r.logger.Error("Failed to parse event context for CEL filtering", zap.String("id", event.Context.AsV02().ID), zap.Error(err)) |
There was a problem hiding this comment.
Shouldn't this return the err?
There was a problem hiding this comment.
There are two options when a parse error occurs:
- Ignore the error and evaluate the expression anyway with the data we parsed correctly.
- Return the error without evaluating. The filter will reject any event that fails to parse.
The code currently implements 1, assuming that if evaluation is possible, it should be attempted. Thus the error is not returned here, because that would skip the expression evaluation. It would probably be a good idea to save the error and return it along with the expression evaluation result.
Here's an example of a Trigger that probably wants to evaluate anyway even if extensions cannot be parsed:
spec:
filter:
cel:
expression: ce.type == "com.example.event"
parseExtensions: trueA failure parsing extensions might not be what the user expects, since the expression could evaluate correctly regardless of whether extensions are successfully parsed.
If we implement option 2, I'd probably rename the field MustParseExtensions or something to indicate that a parse failure causes a rejection of the event.
@n3wscott suggested earlier that it may be possible to detect automatically whether the expression requires parsing of extensions or data by inspecting the CEL AST. This seems like the best case for users, but some users may still want to turn parsing behavior on or off independent of the expression.
One possibly terrible option is to make parseExtensions a string field, with initial values yes and no (the default). Future values could be added: auto (when AST inspection is possible, this becomes the default), required (to indicate behavior on error).
| if ts.Filter.CEL.ParseData { | ||
| dataStruct, err := ceParsedDataStruct(event) | ||
| if err != nil { | ||
| r.logger.Error("Failed to parse event data for CEL filtering", zap.String("id", event.Context.AsV02().ID), zap.Error(err)) |
There was a problem hiding this comment.
Shouldn't this return the err?
| return r.filterEventBySourceAndType(ts, event) | ||
| } | ||
| // TODO is this supposed to be default true? | ||
| return true |
There was a problem hiding this comment.
I don't think it is supposed to get here. Maybe add a log error?
…#996) * Add documentation for changing channel provisioners. * lost -> lose * Links go to knative.dev.
|
/cc @grantr |
Produced via: `prettier --write --prose-wrap=always $(find -name '*.md' | grep -v vendor | grep -v .github)`
Produced via: `prettier --write --prose-wrap=always $(find -name '*.md' | grep -v vendor | grep -v .github)`
When we have documentation and features that assume events will be formatted according to the CloudEvents spec, say that clearly in the docs.
These fields are optional and default false so they can be safely omitted.
* Initial work on Triggers working via path, rather than host (which requires Isito). * Move path manipulation functions to a single package. * Fix the unit tests. * Add a unit test for the Broker Filter service not being found. * Update the comment. * Add a test for mapBrokerToTriggers. * More unit tests around mapBrokerToTriggers.Map. * Switch provisioner.ChannelReference to types.NamespacedName.
GetDataMediaType now returns an error because it tries to parse the content type. If an error occurs, log it and use the empty string as the data media type.
* Add custom reporter for ImmutableField validation errors * Upgrades go-cmp to latest commit of master * Replaces cmp with kmp for producing diffs at runtime * Adds use of custom reporter for kmp * Fixes build warnings and errors * Fix build * Run codegen * Grab latest kn/pkg
* WIP * WIP - In-memory working with E2E tests * WIP - remove istio dependency from in-memroy channel * UTs pass, E2E tests pass with in-memory as well as kafka * fixed uts that failed due to last K8s service change * Removed unnecessary space from a line * dding istio annotation to test POD. This will ve needed when running E2E tests against channels other than in-memory * Bug fix to set clusterIp of K8s service only when it is not of type ExternalName * Updated code based on PR comments * Updates based on PR comments * Updates based on PR comments * Fixed UTs * Updated VENDOR_LICENSE
Baseline attributes now include ID and Subject, which is a new field being standardized for 0.3. The SDK now has common methods for retrieving ID and Time attributes among others. There's also a single top-level cloudevents package that aliases all the types in other packages.
No longer used for CEL type declarations.
* No need for a statefulset * Feedback from @syedriko
* Move ContainerSource API. * stage this. * Move ContainerSource API. * ported rec. * stash. * working cron job source. * add rbac * update deps. * update codegen. * add deepcopy. * clean copyright. * updating to have a source controller. * fix yaml. * use sources. * 2019 * use the real word, not core. * source -> sources. * fix yaml.
* WIP * WIP - In-memory working with E2E tests * WIP - remove istio dependency from in-memroy channel * UTs pass, E2E tests pass with in-memory as well as kafka * fixed uts that failed due to last K8s service change * Removed unnecessary space from a line * dding istio annotation to test POD. This will ve needed when running E2E tests against channels other than in-memory * Bug fix to set clusterIp of K8s service only when it is not of type ExternalName * WIP kafka channel * WIP kafka - UTs and E2E pass More UTs needded * Updated code based on PR comments * WIP * Updates based on PR comments * Updates based on PR comments * Fixed UTs * Updated VENDOR_LICENSE * WIP. Update fanout sidecar * Merge from upstream master * UTs pass, ITs passed. COde ready for PR * Update natss to not use ISTIO. UTs and E2E tests pass. * Updates based on PR comments * REmoved permission to istio virtual service from controller * Changes based on PR comments * Added back permission that was removed by mistake * Remove istio references * WIP * Removed one more reference of istio * Revert kafka.yaml local change * Revert kafka dispatcher change * Removing Mutex. No need to use Mutex when using atomic value for hostToChannelMap * Minor updates based on PR comments
* first pass to update broker controller * starting informers properly, listening to own resources * fix and commenting out UTs * space * adding listers * updating tests * more tests * adding deployment functional * compiling * more UTs * more UTs * more UTs * more UTs * more UTs * updating deps * updating UTs * updating UTs * Updating code gen * inlining method * update after comment * review comments * serviceInformer and namespaceInformer * attempt to fix * solved * Updates after code review * Updating gopkg.lock
…eered during the WG call. (knative#1033)
…#1099) * Move ContainerSource API. * stage this. * Move ContainerSource API. * ported rec. * stash. * working cron job source. * add rbac * update deps. * update codegen. * add deepcopy. * clean copyright. * updating to have a source controller. * fix yaml. * use sources. * 2019 * use the real word, not core. * source -> sources. * fix yaml. * api for container source start. * rec done first pass. tests next. * stagging for test writing.: * first test. * more tests. * working tests. * rbac.
Fixes knative#1081 - Update subscription error when reply strategy does not contain address
* Adding some tests to trigger
* More UTs
* More UTs
* More UTs
* Namespace reconciler automatically creates the Broker Filter's ServiceAccount and RBAC.
Sadly this doesn't work well because we have such an old version of controller-runtime that the Filter ends up trying to watch _all_ Triggers, not just those in its namespace. And it only gets permission for the Triggers in its own namespace.
* Remove no longer needed label.
* Broker and trigger types UTs
* WIP early E2E test
* Changes after code review. Adding trigger defaults and validation tests
* Cleaner trigger validation
* Adding dummy tests for broker validation...
Should be implemented
* Compiling and moving things around
* Updating test
* More updates
* Waiting for potentially multiple contents.
Removing check for corev1.Service ready.
Removing grant's great design. Just making it simpler for now.
* Compiling
* Fixing compilation
* Fixing compilation errors. Adding AnnotateNamespace function.
* Adding ns
* Adding logs. Changing to lowercase any otherwise the pod name is invalid
* Removing namespace when creating trigger subscriber spec.
Adding wait time constant for default broker creation.
* Checking if all triggers are ready
* Updated logs
* Working
* Adding logs... Still not receiving the events.
* More logs
* Adding build constraint
* Removing unnecessary stuff
* Removing ugly structs
* More logs
* Removing quotes
* More logs
* Adding delay
* Listing triggers in receiver when we create it, so that we don't miss
any message because the client couldn't find the existing trigger.
This is a problem in the in-memory-channel as it doesn't do retries.
Maybe the right solution is to add that functionality there.
* Adding delay to sender pod
* Removing withDelay method and just sleep for a while
* Improve log...
* Updates after code review.
* Adding some more logs and trailing dots.
* Switch import order.
* Updating comments.
* Updating comments.
* Replace the bad errgroup usage with the runnableServer.
* Namespace scoped the Broker Filter's client.
* Fix unit tests.
* Fix yaml
* Setting source to source not type.
Updating comment.
* Switch from annotating the namespace to labeling it, to match Istio.
* Adding EventType CRD
* Update to type
* Updates
* updating eventtypes to event-types
* Ingress policy
* Updating environment variables
* yamls to give event-type permissions
* General clean up.
* Adding namespace to ingress and reconciling service accounts in ns.
* More changes
* Service account in namespace
* Updating events registered
* Updating origin to source.
* to newer version
* moving the ingress policy to the broker itself...
should be an object instead of a string.
* making header a constant. Should use the SDK instead.
* Adding dummy change to trigger event
* Using cloudevents sdk but still needs to be updated.
We should change our APIs to receive Events instead of Messages.
* Adding cloudevents-sdk library
* Invalid eventtype
* Initial docs for the Broker.
* Fill some of the usage section.
* Add instructions for installing the Broker without using Namespace annotation.
* Create example_{brokers,triggers}.yaml to document how they can be used.
* Fix MD linter issues.
* Fix MD linter issues.
* Fix MD linter issues.
* Minor clean up.
* adding type as authoritative
* Should actually compare to lowercase
* listing all event types... we should use some sort of selectors.
label selectors won't work on spec.type as they may not conform to the
label format.
field selectors doesn't seem to work for CRDs beyond metadata.name and
namespace.
* Might not need origin after all, but adding it here
* adding origin to yaml as well
* eventtypes instead of event-types to maintain consistency.
* Clean up some spots the merge didn't catch.
* updating example
* Removing source
* cluster-scoped registry to simplify things.
* updating example
* removing source
* updating broker policy to create event types...
updating cloud event SDK version.
* adding a TODO to update to a cluster-level role binding for the ingress.
If we are planning on using cluster-scoped EventTypes
* namespaced EventTypes, i.e., registry per namespace
* adding create permission to ingress
* registry per namespace. Ingress policies working!
* Fix the bad merge by replacing logger.BaseLogger with logger.FormatLogger.
* Add extra columns when using kubectl get.
* MarkBrokerDoesNotExist
* Rename extra columns.
* cosmetic change
* Replace the Trigger reconciler's in-memory map with a simple list, utilizing the fact that Controller Runtime already has the results of the List cached.
* Accept v0.1 and v0.2 cloud events. Adding UTs.
Updating initClient as well, removing unnecessary paging.
* Change to resolve.SubscriberSpec().
* Remove restClient as it wasn't actually used.
* Only reconcile the Namespace if the specific resource we care about changes.
* changing origin to from
* lowercase
* Merge remote-tracking branch 'upstream/master' into registry-namespace
# Conflicts:
# cmd/broker/ingress/main.go
# cmd/webhook/main.go
# config/200-broker-clusterrole.yaml
# config/500-controller.yaml
# pkg/apis/eventing/v1alpha1/broker_types.go
# pkg/apis/eventing/v1alpha1/register.go
# pkg/reconciler/v1alpha1/broker/broker.go
# pkg/reconciler/v1alpha1/broker/resources/ingress.go
# pkg/reconciler/v1alpha1/namespace/namespace.go
# pkg/reconciler/v1alpha1/trigger/trigger.go
* updating event type to include broker
* additional columns
* just TODOs
* adding some TODOs
* Adding TODOs, updating cloudevents after my change
* Adding observability
* validation
* broker default
* event type controller... checks for broker existence and readiness
* adding extra columns
* message in columns
* Passing broker. Need to refactor this module
* broker immutable to avoid problem with label selector
* listening for broker changes
* removing broker label. Making broker mutable again.
* fixing dns subdomain
* ingress policy as an object.
adding some UTs.
still not convinced.
* removing DeprecatedGeneration
* Adding README
* yaml
* updating readme
* updating trigger
* all fields immutable in eventType.
this makes our life easier when reconciling them from sources.
* adding UTs
* Updating README for source
* moving ToDNSSubdomain to utils
* removing ommitempty
* Using github.com/kelseyhightower/envconfig for env variables.
* changing Triggers to also look for an extension attribute as part of its
source. If there, then we use that as source for exact matching.
* Updating ingress to read From custom extension, if present, and use that
Otherwise it uses source
* Updating comments
* It seems to complaint if I add an invalid URL in the markup.
Changing schema.
* typo
* lowercase from
* using new cloud event API
* addressing Grant's comment.
Debug instead of Info
* Bumping cloud-sdk
* Migrate to correct sdk usage.
* update README with other branch
* updating e2e tests
* Updating deepcopy
* naming changes
* disabling registry test to see if this is causing problems
* re-enabling these guys again
* Missing files
* adding description
* removing autoAdd
* some updates, still broken
* some updates, still broken
* removing broker ingress policies related stuff
* adding eventType controller
* eventType controller new serving structure
* bug
* removing commented code
* adding Gopkg.lock
* rollback source getter
* permissions!!! damn...
* permissions.
* removing formatting of imports in goland
* tracking changes to broker as trigger does.
…e#1107) * WIP * WIP - In-memory working with E2E tests * WIP - remove istio dependency from in-memroy channel * UTs pass, E2E tests pass with in-memory as well as kafka * fixed uts that failed due to last K8s service change * Removed unnecessary space from a line * dding istio annotation to test POD. This will ve needed when running E2E tests against channels other than in-memory * Bug fix to set clusterIp of K8s service only when it is not of type ExternalName * WIP kafka channel * WIP kafka - UTs and E2E pass More UTs needded * Updated code based on PR comments * WIP * Updates based on PR comments * Updates based on PR comments * Fixed UTs * Updated VENDOR_LICENSE * WIP. Update fanout sidecar * Merge from upstream master * UTs pass, ITs passed. COde ready for PR * Update natss to not use ISTIO. UTs and E2E tests pass. * Updates based on PR comments * REmoved permission to istio virtual service from controller * WIP * Changes based on PR comments * Added back permission that was removed by mistake * WIP * Remove istio references * WIP * Removed one more reference of istio * Revert kafka.yaml local change * WIP * Revert kafka dispatcher change * Removing Mutex. No need to use Mutex when using atomic value for hostToChannelMap * Removed named port from GCP dispatcher K8s service * WIP * FInal changes before validating E2E tests * Updates based on PR comments
[BUG] eventing controller and source-controller fall off main() on shutdown signal
Produced via: `gofmt -s -w $(find -path './vendor' -prune -o -type f -name '*.go' -print))` `goimports -w $(find -name '*.go' | grep -v vendor)`
Produced via: `github.com/client9/misspell`
* clean scripts * update test script * enable e2e for natss provisioner * clean up * fix CR issues
…ding (knative#1127) * Reconciling broker, service account, and role binding in Namespace controller * updating Gopkg.lock
Produced via: `gofmt -s -w $(find -path './vendor' -prune -o -type f -name '*.go' -print))` `goimports -w $(find -name '*.go' | grep -v vendor)`
…tive#1119) * WIP * WIP - In-memory working with E2E tests * WIP - remove istio dependency from in-memroy channel * UTs pass, E2E tests pass with in-memory as well as kafka * fixed uts that failed due to last K8s service change * Removed unnecessary space from a line * dding istio annotation to test POD. This will ve needed when running E2E tests against channels other than in-memory * Bug fix to set clusterIp of K8s service only when it is not of type ExternalName * WIP kafka channel * WIP kafka - UTs and E2E pass More UTs needded * Updated code based on PR comments * WIP * Updates based on PR comments * Updates based on PR comments * Fixed UTs * Updated VENDOR_LICENSE * WIP. Update fanout sidecar * Merge from upstream master * UTs pass, ITs passed. COde ready for PR * Update natss to not use ISTIO. UTs and E2E tests pass. * Updates based on PR comments * REmoved permission to istio virtual service from controller * WIP * Changes based on PR comments * Added back permission that was removed by mistake * WIP * Remove istio references * WIP * Removed one more reference of istio * Revert kafka.yaml local change * WIP * Revert kafka dispatcher change * Removing Mutex. No need to use Mutex when using atomic value for hostToChannelMap * Removed named port from GCP dispatcher K8s service * WIP * FInal changes before validating E2E tests * Removed istio sidecars from all dispatchers and from broker. * Updates based on PR comments * gcppubsub istio/serviceentry removed * cronjob source tested with an without sidecar * Container source * Removed sidecars from tests * hack/update-dep.sh * Fixed build error * Updates based on PR comments
* Remove unneeded log keys.
knative.dev/controller is set by pkg/reconciler just before calling our reconciler, so adding a second one in main is redundant and produced entries with:
{
"knative.dev/controller": "controller",
"knative.dev/controller": "trigger-controller",
}
The reconcile key is already saved as 'knative.dev/key' by pkg/reconciler, so we don't need to add 'key' as well.
* Remove explict adds of key.
* PR comments.
The latest knative/pkg upgrades its dependency on the stackdriver exporter to v0.9.2, which is now compatible with protobuf 1.3.
DO NOT SUBMIT
Just for review, not meant to merge.