From 7cb402f4fec26c313b4b431a7c80812aa24ec879 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 25 Jun 2018 11:56:26 -0400 Subject: [PATCH] Add ChannelName as a bind action --- pkg/apis/feeds/v1alpha1/bind_types.go | 5 + pkg/controller/bind/controller.go | 78 ++++++++---- sample/k8s_events_function/README.md | 4 +- sample/k8s_events_function/README_CHANNEL.md | 125 +++++++++++++++++++ sample/k8s_events_function/bind-channel.yaml | 31 +++++ 5 files changed, 218 insertions(+), 25 deletions(-) create mode 100644 sample/k8s_events_function/README_CHANNEL.md create mode 100644 sample/k8s_events_function/bind-channel.yaml diff --git a/pkg/apis/feeds/v1alpha1/bind_types.go b/pkg/apis/feeds/v1alpha1/bind_types.go index 3c8246ca3a9..0bc7bd8b803 100644 --- a/pkg/apis/feeds/v1alpha1/bind_types.go +++ b/pkg/apis/feeds/v1alpha1/bind_types.go @@ -36,8 +36,13 @@ type Bind struct { } type BindAction struct { + // You must specify one and only of these. + // RouteName specifies Knative route as a target. RouteName string `json:"routeName,omitempty"` + + // ChannelName specifies the channel name as a target + ChannelName string `json:"channelName,omitempty"` } type EventTrigger struct { diff --git a/pkg/controller/bind/controller.go b/pkg/controller/bind/controller.go index 89e943249a3..1691d7ebf61 100644 --- a/pkg/controller/bind/controller.go +++ b/pkg/controller/bind/controller.go @@ -51,6 +51,7 @@ import ( clientset "github.com/knative/eventing/pkg/client/clientset/versioned" bindscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/pkg/client/informers/externalversions" + channelListers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" listers "github.com/knative/eventing/pkg/client/listers/feeds/v1alpha1" ) @@ -90,6 +91,9 @@ type Controller struct { routesLister servinglisters.RouteLister routesSynced cache.InformerSynced + channelsLister channelListers.ChannelLister + channelsSynced cache.InformerSynced + // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a @@ -115,6 +119,8 @@ func NewController( // obtain a reference to a shared index informer for the Route type. routeInformer := routeInformerFactory.Serving().V1alpha1().Routes() + channelInformer := feedsInformerFactory.Channels().V1alpha1().Channels() + // Create event broadcaster // Add bind-controller types to the default Kubernetes Scheme so Events can be // logged for bind-controller types. @@ -136,6 +142,8 @@ func NewController( eventSourcesSynced: bindInformer.EventSources().Informer().HasSynced, eventTypesLister: bindInformer.EventTypes().Lister(), eventTypesSynced: bindInformer.EventTypes().Informer().HasSynced, + channelsLister: channelInformer.Lister(), + channelsSynced: channelInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Binds"), recorder: recorder, } @@ -184,6 +192,11 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { return fmt.Errorf("failed to wait for Route caches to sync") } + glog.Info("Waiting for channel informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.channelsSynced); !ok { + return fmt.Errorf("failed to wait for Channel caches to sync") + } + glog.Info("Starting workers") // Launch two workers to process Bind resources for i := 0; i < threadiness; i++ { @@ -300,42 +313,24 @@ func (c *Controller) syncHandler(key string) error { } deletionTimestamp := accessor.GetDeletionTimestamp() - // Find the Route that they want. - routeName := bind.Spec.Action.RouteName - route, err := c.routesLister.Routes(namespace).Get(routeName) - functionDNS := "" + functionDNS, err := c.resolveActionTarget(bind.Namespace, bind.Spec.Action) - // Only return an error if we're not deleting so that we can delete - // the binding even if the route has already been deleted. + // Only return an error on not found if we're not deleting so that we can delete + // the binding even if the route or channel has already been deleted. if err != nil && deletionTimestamp == nil { if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Route %q in namespace %q does not exist", routeName, namespace)) + runtime.HandleError(fmt.Errorf("cannot resolve target for %v in namespace %q", bind.Spec.Action, namespace)) } return err } - if route != nil { - functionDNS = route.Status.Domain - glog.Infof("Found route DNS as %q", functionDNS) - } - if len(functionDNS) == 0 { - // Only return an error if we're not deleting so that we can delete - // the binding even if the route has already been deleted. - if deletionTimestamp == nil { - return fmt.Errorf("Route %q does not have domain", routeName) - } - log.Printf("Route %q does not have domain", routeName) - } - es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) if err != nil { if errors.IsNotFound(err) { if deletionTimestamp != nil { // If the Event Source can not be found, we will remove our finalizer // because without it, we can't unbind and hence this binding will never - // be deleted. There needs to be a way to either - // prevent or cascade delete bindings before the EventSource goes away. Or - // we need to make a defensive copy so that we can unbind. + // be deleted. // https://github.com/knative/eventing/issues/94 newFinalizers, err := RemoveFinalizer(bind, controllerAgentName) if err != nil { @@ -603,3 +598,40 @@ func newEventTypeNonControllerRef(et *v1alpha1.EventType) *metav1.OwnerReference revRef.Controller = &isController return revRef } + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Bind resource +// with the current status of the resource. +func (c *Controller) resolveActionTarget(namespace string, action v1alpha1.BindAction) (string, error) { + if len(action.RouteName) > 0 { + return c.resolveRouteDNS(namespace, action.RouteName) + } + if len(action.ChannelName) > 0 { + return c.resolveChannelDNS(namespace, action.ChannelName) + } + // This should never happen, but because we don't have webhook validation yet, check + // and complain. + return "", fmt.Errorf("action is missing both RouteName and ChannelName") +} + +func (c *Controller) resolveRouteDNS(namespace string, routeName string) (string, error) { + route, err := c.routesLister.Routes(namespace).Get(routeName) + if err != nil { + return "", err + } + if len(route.Status.Domain) == 0 { + return "", fmt.Errorf("route '%s/%s' is missing a domain", namespace, routeName) + } + return route.Status.Domain, nil +} + +func (c *Controller) resolveChannelDNS(namespace string, channelName string) (string, error) { + channel, err := c.channelsLister.Channels(namespace).Get(channelName) + if err != nil { + return "", err + } + // TODO: The actual dns name should come from something in the status, or ?? But right + // now it is hard coded to be -channel + // So we just check that the channel actually exists and tack on the -channel + return fmt.Sprintf("%s-channel", channel.Name), nil +} diff --git a/sample/k8s_events_function/README.md b/sample/k8s_events_function/README.md index 8f6155752fe..7b8c4676e9c 100644 --- a/sample/k8s_events_function/README.md +++ b/sample/k8s_events_function/README.md @@ -1,8 +1,8 @@ # k8s_events_function A simple function that receives Kubernetes events and prints them out the after decoding -from base64 encoding. Because we do **not** have an in-cluster event delivery mechanism yet, uses a -Knative route as an endpoint. This is also example of where we make use of a Receive Adapter +from base64 encoding. This example targets a Knative Route, there's another example [using +channels](./README_CHANNEL.md). This is also example of where we make use of a Receive Adapter that runs in the context of the namespace where the binding is created. Since there's no push events, we create a deployment that attaches to k8s events for a given namespace and then forwards them to the destination. diff --git a/sample/k8s_events_function/README_CHANNEL.md b/sample/k8s_events_function/README_CHANNEL.md new file mode 100644 index 00000000000..7e12b732bc5 --- /dev/null +++ b/sample/k8s_events_function/README_CHANNEL.md @@ -0,0 +1,125 @@ +# k8s_events_function + +A simple function that receives Kubernetes events and prints them out the after decoding +from base64 encoding. This version wires the action into an existing Channel differing from +the [sample that targets a Knative route](./README.md). This is also example of where we +make use of a Receive Adapter that runs in the context of the namespace where the binding +is created. Since there's no push events, we create a deployment that attaches to k8s events +for a given namespace and then forwards them to the destination. + +## Prerequisites + +1. [Setup your development environment](../../DEVELOPMENT.md#getting-started) +2. [Start Knative](../../README.md#start-knative) +3. Install k8s events as an event source +4. [Install a channel and function](../hello/README.md) + +```shell +ko apply -f pkg/sources/k8sevents/ +``` + +Once deployed, you can inspect the created resources with `kubectl` commands: + +```shell +# This will show the available EventSources that you can bind to: +kubectl get eventsources -oyaml + +# This will show the available EventTypes that you can bind to: +kubectl get eventtypes -oyaml + +``` + +## Creating a Service Account +Because the Receive Adapter needs to run a deployment, you need to specify what +Service Account should be used in the target namespace for running the Receive Adapter. +Bind.Spec has a field that allows you to specify this. By default it uses "default" for +binding which typically has no priviledges, but this binding requires standing up a +deployment, so you need to either use an existing Service Account with appropriate +priviledges or create a new one. This example creates a Service Account and grants +it cluster admin access, and you probably wouldn't want to do that in production +settings, but for this example it will suffice just fine. + +```shell +ko apply -f sample/k8s_events_function/serviceaccount.yaml +ko apply -f sample/k8s_events_function/serviceaccountbinding.yaml +``` + +## Running + +Assuming you have [installed the Bus, channel and function](../hello/README.md), you +can now create a binding for k8s events that will then send k8s events by creating +a binding targeting that channel. + +Note that if you are using a different Service Account than created in the example above, +you also need to specify that Service Account in the bind.spec.serviceAccountName. +Modify sample/k8s_events_function/bind-channel.yaml to specify the namespace you want to +watch events for ('default' in this example): + +```yaml +apiVersion: feeds.knative.dev/v1alpha1 +kind: Bind +metadata: + name: k8s-events-example-channel + namespace: default +spec: + serviceAccountName: binder + trigger: + eventType: receiveevent + resource: k8sevents/receiveevent + service: k8sevents + parameters: + namespace: default + action: + channelName: aloha +``` + +Then create the binding so that you can see changes on that namespace + +```shell + kubectl create -f sample/k8s_events_function/bind-channel.yaml +``` + +This will create a receive_adapter that runs in the cluster and receives native k8s events +and pushes them to the consuming function. + +```shell +$kubectl -n knative-eventing-system get pods + +NAME READY STATUS RESTARTS AGE +bind-controller-dddb99dfc-jzp7z 1/1 Running 0 1d +sub-a3095905-f9c8-4f32-87ac-3c8fec9b51f9-85db55dc48-2mbm9 1/1 Running 0 1m + +``` + +Then you can launch a deployment in the namespace that you are watching: + +```shell +kubectl -n run --image=nginx nginxtest +``` + +Then look at the logs for the function: + +```shell +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +hello-00001-deployment-747d97bbdc-cxg8p 4/4 Running 0 3d + +# Replace hello-00001-deployment-XXXX with the pod name from above: +$ kubectl logs hello-00001-deployment-XXXX user-container +``` + +## Removing a binding + +Remove the binding and things get cleaned up (including removing the receive adapter to k8s events) + +```shell +kubectl delete binds k8s-events-example-channel +``` + +## Cleaning up + +To clean up the sample service: + +```shell +ko delete -f sample/k8s_events_function/ +``` diff --git a/sample/k8s_events_function/bind-channel.yaml b/sample/k8s_events_function/bind-channel.yaml new file mode 100644 index 00000000000..ea510b328e7 --- /dev/null +++ b/sample/k8s_events_function/bind-channel.yaml @@ -0,0 +1,31 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: feeds.knative.dev/v1alpha1 +kind: Bind +metadata: + name: k8s-events-example-channel + namespace: default +spec: + serviceAccountName: binder + trigger: + eventType: receiveevent + # This is kind of superfluous due to parameters, but the current + # bind model still requires a resource + resource: k8sevents/receiveevent + service: k8sevents + parameters: + namespace: default + action: + channelName: aloha