Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/apis/feeds/v1alpha1/bind_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ type Bind struct {
}

type BindAction struct {
// You must specify one and only of these.

// RouteName specifies Knative route as a target.
RouteName string `json:"routeName,omitempty"`

// ChannelName specifies the channel name as a target
ChannelName string `json:"channelName,omitempty"`
}

type EventTrigger struct {
Expand Down
78 changes: 55 additions & 23 deletions pkg/controller/bind/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
clientset "github.com/knative/eventing/pkg/client/clientset/versioned"
bindscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme"
informers "github.com/knative/eventing/pkg/client/informers/externalversions"
channelListers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1"
listers "github.com/knative/eventing/pkg/client/listers/feeds/v1alpha1"
)

Expand Down Expand Up @@ -90,6 +91,9 @@ type Controller struct {
routesLister servinglisters.RouteLister
routesSynced cache.InformerSynced

channelsLister channelListers.ChannelLister
channelsSynced cache.InformerSynced

// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
Expand All @@ -115,6 +119,8 @@ func NewController(
// obtain a reference to a shared index informer for the Route type.
routeInformer := routeInformerFactory.Serving().V1alpha1().Routes()

channelInformer := feedsInformerFactory.Channels().V1alpha1().Channels()

// Create event broadcaster
// Add bind-controller types to the default Kubernetes Scheme so Events can be
// logged for bind-controller types.
Expand All @@ -136,6 +142,8 @@ func NewController(
eventSourcesSynced: bindInformer.EventSources().Informer().HasSynced,
eventTypesLister: bindInformer.EventTypes().Lister(),
eventTypesSynced: bindInformer.EventTypes().Informer().HasSynced,
channelsLister: channelInformer.Lister(),
channelsSynced: channelInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Binds"),
recorder: recorder,
}
Expand Down Expand Up @@ -184,6 +192,11 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return fmt.Errorf("failed to wait for Route caches to sync")
}

glog.Info("Waiting for channel informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.channelsSynced); !ok {
return fmt.Errorf("failed to wait for Channel caches to sync")
}

glog.Info("Starting workers")
// Launch two workers to process Bind resources
for i := 0; i < threadiness; i++ {
Expand Down Expand Up @@ -300,42 +313,24 @@ func (c *Controller) syncHandler(key string) error {
}
deletionTimestamp := accessor.GetDeletionTimestamp()

// Find the Route that they want.
routeName := bind.Spec.Action.RouteName
route, err := c.routesLister.Routes(namespace).Get(routeName)
functionDNS := ""
functionDNS, err := c.resolveActionTarget(bind.Namespace, bind.Spec.Action)

// Only return an error if we're not deleting so that we can delete
// the binding even if the route has already been deleted.
// Only return an error on not found if we're not deleting so that we can delete
// the binding even if the route or channel has already been deleted.
if err != nil && deletionTimestamp == nil {
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("Route %q in namespace %q does not exist", routeName, namespace))
runtime.HandleError(fmt.Errorf("cannot resolve target for %v in namespace %q", bind.Spec.Action, namespace))
}
return err
}

if route != nil {
functionDNS = route.Status.Domain
glog.Infof("Found route DNS as %q", functionDNS)
}
if len(functionDNS) == 0 {
// Only return an error if we're not deleting so that we can delete
// the binding even if the route has already been deleted.
if deletionTimestamp == nil {
return fmt.Errorf("Route %q does not have domain", routeName)
}
log.Printf("Route %q does not have domain", routeName)
}

es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service)
if err != nil {
if errors.IsNotFound(err) {
if deletionTimestamp != nil {
// If the Event Source can not be found, we will remove our finalizer
// because without it, we can't unbind and hence this binding will never
// be deleted. There needs to be a way to either
// prevent or cascade delete bindings before the EventSource goes away. Or
// we need to make a defensive copy so that we can unbind.
// be deleted.
// https://github.com/knative/eventing/issues/94
newFinalizers, err := RemoveFinalizer(bind, controllerAgentName)
if err != nil {
Expand Down Expand Up @@ -603,3 +598,40 @@ func newEventTypeNonControllerRef(et *v1alpha1.EventType) *metav1.OwnerReference
revRef.Controller = &isController
return revRef
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Bind resource
// with the current status of the resource.
func (c *Controller) resolveActionTarget(namespace string, action v1alpha1.BindAction) (string, error) {
if len(action.RouteName) > 0 {
return c.resolveRouteDNS(namespace, action.RouteName)
}
if len(action.ChannelName) > 0 {
return c.resolveChannelDNS(namespace, action.ChannelName)
}
// This should never happen, but because we don't have webhook validation yet, check
// and complain.
return "", fmt.Errorf("action is missing both RouteName and ChannelName")
}

func (c *Controller) resolveRouteDNS(namespace string, routeName string) (string, error) {
route, err := c.routesLister.Routes(namespace).Get(routeName)
if err != nil {
return "", err
}
if len(route.Status.Domain) == 0 {
return "", fmt.Errorf("route '%s/%s' is missing a domain", namespace, routeName)
}
return route.Status.Domain, nil
}

func (c *Controller) resolveChannelDNS(namespace string, channelName string) (string, error) {
channel, err := c.channelsLister.Channels(namespace).Get(channelName)
if err != nil {
return "", err
}
// TODO: The actual dns name should come from something in the status, or ?? But right
// now it is hard coded to be <channelname>-channel
// So we just check that the channel actually exists and tack on the -channel
return fmt.Sprintf("%s-channel", channel.Name), nil
}
4 changes: 2 additions & 2 deletions sample/k8s_events_function/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# k8s_events_function

A simple function that receives Kubernetes events and prints them out the after decoding
from base64 encoding. Because we do **not** have an in-cluster event delivery mechanism yet, uses a
Knative route as an endpoint. This is also example of where we make use of a Receive Adapter
from base64 encoding. This example targets a Knative Route, there's another example [using
channels](./README_CHANNEL.md). This is also example of where we make use of a Receive Adapter
that runs in the context of the namespace where the binding is created. Since there's no push
events, we create a deployment that attaches to k8s events for a given namespace and then
forwards them to the destination.
Expand Down
125 changes: 125 additions & 0 deletions sample/k8s_events_function/README_CHANNEL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# k8s_events_function

A simple function that receives Kubernetes events and prints them out the after decoding
from base64 encoding. This version wires the action into an existing Channel differing from
the [sample that targets a Knative route](./README.md). This is also example of where we
make use of a Receive Adapter that runs in the context of the namespace where the binding
is created. Since there's no push events, we create a deployment that attaches to k8s events
for a given namespace and then forwards them to the destination.

## Prerequisites

1. [Setup your development environment](../../DEVELOPMENT.md#getting-started)
2. [Start Knative](../../README.md#start-knative)
3. Install k8s events as an event source
4. [Install a channel and function](../hello/README.md)

```shell
ko apply -f pkg/sources/k8sevents/
```

Once deployed, you can inspect the created resources with `kubectl` commands:

```shell
# This will show the available EventSources that you can bind to:
kubectl get eventsources -oyaml

# This will show the available EventTypes that you can bind to:
kubectl get eventtypes -oyaml

```

## Creating a Service Account
Because the Receive Adapter needs to run a deployment, you need to specify what
Service Account should be used in the target namespace for running the Receive Adapter.
Bind.Spec has a field that allows you to specify this. By default it uses "default" for
binding which typically has no priviledges, but this binding requires standing up a
deployment, so you need to either use an existing Service Account with appropriate
priviledges or create a new one. This example creates a Service Account and grants
it cluster admin access, and you probably wouldn't want to do that in production
settings, but for this example it will suffice just fine.

```shell
ko apply -f sample/k8s_events_function/serviceaccount.yaml
ko apply -f sample/k8s_events_function/serviceaccountbinding.yaml
```

## Running

Assuming you have [installed the Bus, channel and function](../hello/README.md), you
can now create a binding for k8s events that will then send k8s events by creating
a binding targeting that channel.

Note that if you are using a different Service Account than created in the example above,
you also need to specify that Service Account in the bind.spec.serviceAccountName.
Modify sample/k8s_events_function/bind-channel.yaml to specify the namespace you want to
watch events for ('default' in this example):

```yaml
apiVersion: feeds.knative.dev/v1alpha1
kind: Bind
metadata:
name: k8s-events-example-channel
namespace: default
spec:
serviceAccountName: binder
trigger:
eventType: receiveevent
resource: k8sevents/receiveevent
service: k8sevents
parameters:
namespace: default
action:
channelName: aloha
```

Then create the binding so that you can see changes on that namespace

```shell
kubectl create -f sample/k8s_events_function/bind-channel.yaml
```

This will create a receive_adapter that runs in the cluster and receives native k8s events
and pushes them to the consuming function.

```shell
$kubectl -n knative-eventing-system get pods

NAME READY STATUS RESTARTS AGE
bind-controller-dddb99dfc-jzp7z 1/1 Running 0 1d
sub-a3095905-f9c8-4f32-87ac-3c8fec9b51f9-85db55dc48-2mbm9 1/1 Running 0 1m

```

Then you can launch a deployment in the namespace that you are watching:

```shell
kubectl -n <namespace> run --image=nginx nginxtest
```

Then look at the logs for the function:

```shell
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
hello-00001-deployment-747d97bbdc-cxg8p 4/4 Running 0 3d

# Replace hello-00001-deployment-XXXX with the pod name from above:
$ kubectl logs hello-00001-deployment-XXXX user-container
```

## Removing a binding

Remove the binding and things get cleaned up (including removing the receive adapter to k8s events)

```shell
kubectl delete binds k8s-events-example-channel
```

## Cleaning up

To clean up the sample service:

```shell
ko delete -f sample/k8s_events_function/
```
31 changes: 31 additions & 0 deletions sample/k8s_events_function/bind-channel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: feeds.knative.dev/v1alpha1
kind: Bind
metadata:
name: k8s-events-example-channel
namespace: default
spec:
serviceAccountName: binder
trigger:
eventType: receiveevent
# This is kind of superfluous due to parameters, but the current
# bind model still requires a resource
resource: k8sevents/receiveevent
service: k8sevents
parameters:
namespace: default
action:
channelName: aloha