Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/apis/feeds/v1alpha1/bind_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ type BindSpec struct {

// Trigger specifies the trigger we're binding to
Trigger EventTrigger `json:"trigger"`

// Service Account to run binding container. If left out, uses "default"
ServiceAccountName string `json:"serviceAccountName,omitempty"`
}

// ParametersFromSource represents the source of a set of Parameters
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/bind/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (c *Controller) syncHandler(key string) error {

// If there are conditions or a context do nothing.
if (bind.Status.Conditions != nil || bind.Status.BindContext != nil) && deletionTimestamp == nil {
glog.Infof("Already has status, skipping")
glog.Infof("Binding \"%s/%s\" already has status, skipping", bind.Namespace, bind.Name)
return nil
}

Expand All @@ -391,7 +391,12 @@ func (c *Controller) syncHandler(key string) error {
// Don't mutate the informer's copy of our object.
newES := es.DeepCopy()

binder := sources.NewContainerEventSource(bind, c.kubeclientset, &newES.Spec, "knative-eventing-system")
// check if the user specified a ServiceAccount to use and if so, use it.
serviceAccountName := "default"
if len(bind.Spec.ServiceAccountName) != 0 {
serviceAccountName = bind.Spec.ServiceAccountName
}
binder := sources.NewContainerEventSource(bind, c.kubeclientset, &newES.Spec, bind.Namespace, serviceAccountName)
if deletionTimestamp == nil {
glog.Infof("Creating a subscription to %q : %q for resource %q", es.Name, et.Name, trigger.Resource)
bindContext, err := binder.Bind(trigger, functionDNS)
Expand Down
18 changes: 11 additions & 7 deletions pkg/sources/container_event_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,28 @@ type ContainerEventSource struct {
// Namespace to run the container in
Namespace string

// ServiceAccount to run as
ServiceAccountName string

// Binding for this operation
Binding *v1alpha1.Bind

// EventSourceSpec for the actual underlying source
EventSourceSpec *v1alpha1.EventSourceSpec
}

func NewContainerEventSource(bind *v1alpha1.Bind, kubeclientset kubernetes.Interface, spec *v1alpha1.EventSourceSpec, namespace string) EventSource {
func NewContainerEventSource(bind *v1alpha1.Bind, kubeclientset kubernetes.Interface, spec *v1alpha1.EventSourceSpec, namespace string, serviceAccountName string) EventSource {
return &ContainerEventSource{
kubeclientset: kubeclientset,
Namespace: namespace,
Binding: bind,
EventSourceSpec: spec,
kubeclientset: kubeclientset,
Namespace: namespace,
ServiceAccountName: serviceAccountName,
Binding: bind,
EventSourceSpec: spec,
}
}

func (t *ContainerEventSource) Bind(trigger EventTrigger, route string) (*BindContext, error) {
job, err := MakeJob(t.Binding, t.Namespace, "bind-controller", "binder", t.EventSourceSpec, Bind, trigger, route, BindContext{})
job, err := MakeJob(t.Binding, t.Namespace, t.ServiceAccountName, "binder", t.EventSourceSpec, Bind, trigger, route, BindContext{})
if err != nil {
glog.Errorf("failed to make job: %s", err)
return nil, err
Expand All @@ -74,7 +78,7 @@ func (t *ContainerEventSource) Bind(trigger EventTrigger, route string) (*BindCo
}

func (t *ContainerEventSource) Unbind(trigger EventTrigger, bindContext BindContext) error {
job, err := MakeJob(t.Binding, t.Namespace, "bind-controller", "binder", t.EventSourceSpec, Unbind, trigger, "", bindContext)
job, err := MakeJob(t.Binding, t.Namespace, t.ServiceAccountName, "binder", t.EventSourceSpec, Unbind, trigger, "", bindContext)
if err != nil {
glog.Errorf("failed to make job: %s", err)
return err
Expand Down
29 changes: 29 additions & 0 deletions pkg/sources/container_event_source_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ const (

// EventSourceParametersKey is the Env variable that gets set to serialized EventSourceSpec
EventSourceParametersKey string = "EVENT_SOURCE_PARAMETERS"

// BindNamespaceKey is the Env variable that gets set to namespace of the container doing
// the Bind (aka, namespace of the binding). Uses downward api
BindNamespaceKey string = "BIND_NAMESPACE"

// BindServiceAccount is the Env variable that gets set to serviceaccount of the
// container doing the Bind. Uses downward api
BindServiceAccountKey string = "BIND_SERVICE_ACCOUNT"
)

// MakeJob creates a Job to complete a bind or unbind operation.
Expand Down Expand Up @@ -102,6 +110,11 @@ func makePodTemplate(bind *v1alpha1.Bind, namespace string, serviceAccountName s
}

return &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Spec: corev1.PodSpec{
ServiceAccountName: serviceAccountName,
RestartPolicy: corev1.RestartPolicyNever,
Expand Down Expand Up @@ -131,6 +144,22 @@ func makePodTemplate(bind *v1alpha1.Bind, namespace string, serviceAccountName s
Name: EventSourceParametersKey,
Value: encodedParameters,
},
{
Name: BindNamespaceKey,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: BindServiceAccountKey,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "spec.serviceAccountName",
},
},
},
},
},
},
Expand Down
24 changes: 16 additions & 8 deletions pkg/sources/gcppubsub/gcp_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ type GCPPubSubEventSource struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
image string
// namespace where the binding is created
bindNamespace string
// serviceAccount that the container runs as. Launches Receive Adapter with the
// same Service Account
bindServiceAccountName string
}

func NewGCPPubSubEventSource(kubeclientset kubernetes.Interface, image string) sources.EventSource {
func NewGCPPubSubEventSource(kubeclientset kubernetes.Interface, bindNamespace string, bindServiceAccountName string, image string) sources.EventSource {
glog.Infof("Image: %q", image)
return &GCPPubSubEventSource{kubeclientset: kubeclientset, image: image}
return &GCPPubSubEventSource{kubeclientset: kubeclientset, bindNamespace: bindNamespace, bindServiceAccountName: bindServiceAccountName, image: image}
}

func (t *GCPPubSubEventSource) Unbind(trigger sources.EventTrigger, bindContext sources.BindContext) error {
Expand All @@ -64,7 +69,7 @@ func (t *GCPPubSubEventSource) Unbind(trigger sources.EventTrigger, bindContext
deploymentName := bindContext.Context[deployment].(string)
subscriptionName := bindContext.Context[subscription].(string)

err := t.deleteWatcher("knative-eventing-system", deploymentName)
err := t.deleteWatcher(t.bindNamespace, deploymentName)
if err != nil {
glog.Warningf("Failed to delete deployment: %s", err)
return err
Expand Down Expand Up @@ -126,7 +131,7 @@ func (t *GCPPubSubEventSource) Bind(trigger sources.EventTrigger, route string)

// Create actual watcher
deploymentName := subscriptionName
err = t.createWatcher("knative-eventing-system", deploymentName, t.image, projectID, subscriptionName, route)
err = t.createWatcher(deploymentName, t.image, projectID, subscriptionName, route)
if err != nil {
glog.Infof("Failed to create deployment: %v", err)

Expand All @@ -147,8 +152,8 @@ func (t *GCPPubSubEventSource) Bind(trigger sources.EventTrigger, route string)

}

func (t *GCPPubSubEventSource) createWatcher(namespace string, deploymentName string, image string, projectID string, subscription string, route string) error {
dc := t.kubeclientset.AppsV1().Deployments(namespace)
func (t *GCPPubSubEventSource) createWatcher(deploymentName string, image string, projectID string, subscription string, route string) error {
dc := t.kubeclientset.AppsV1().Deployments(t.bindNamespace)

// First, check if deployment exists already.
if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil {
Expand All @@ -164,7 +169,7 @@ func (t *GCPPubSubEventSource) createWatcher(namespace string, deploymentName st

// TODO: Create ownerref to the binding so when the binding goes away deployment
// gets removed. Currently we manually delete the deployment.
deployment := MakeWatcherDeployment(namespace, deploymentName, "bind-controller", image, projectID, subscription, route)
deployment := MakeWatcherDeployment(t.bindNamespace, deploymentName, t.bindServiceAccountName, image, projectID, subscription, route)
_, createErr := dc.Create(deployment)
return createErr
}
Expand Down Expand Up @@ -194,6 +199,9 @@ func main() {

decodedParameters, _ := base64.StdEncoding.DecodeString(os.Getenv(sources.EventSourceParametersKey))

bindNamespace := os.Getenv(sources.BindNamespaceKey)
bindServiceAccountName := os.Getenv(sources.BindServiceAccountKey)

var p parameters
err := json.Unmarshal(decodedParameters, &p)
if err != nil {
Expand All @@ -210,6 +218,6 @@ func main() {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

sources.RunEventSource(NewGCPPubSubEventSource(kubeClient, p.Image))
sources.RunEventSource(NewGCPPubSubEventSource(kubeClient, bindNamespace, bindServiceAccountName, p.Image))
log.Printf("Done...")
}
9 changes: 9 additions & 0 deletions pkg/sources/gcppubsub/watcher_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
sidecarIstioInjectAnnotation = "sidecar.istio.io/inject"
)

// MakeWatcherDeployment creates a deployment for a watcher.
// TODO: a whole bunch...
func MakeWatcherDeployment(namespace string, deploymentName string, serviceAccount string, image string, projectID string, subscription string, route string) *appsv1.Deployment {
Expand All @@ -41,6 +45,11 @@ func MakeWatcherDeployment(namespace string, deploymentName string, serviceAccou
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
// Inject Istio so any connection made from the receive adapter
// goes through and is enforced by Istio. Currently for some
// reason turning this on means that the container can not
// reach GCP so leaving this false for now.
Annotations: map[string]string{sidecarIstioInjectAnnotation: "false"},
},
Spec: corev1.PodSpec{
ServiceAccountName: serviceAccount,
Expand Down
28 changes: 18 additions & 10 deletions pkg/sources/k8sevents/k8s_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,24 @@ type K8SEventsEventSource struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
image string
// namespace where the binding is created
bindNamespace string
// serviceAccount that the container runs as. Launches Receive Adapter with the
// same Service Account
bindServiceAccountName string
}

func NewK8SEventsEventSource(kubeclientset kubernetes.Interface, image string) sources.EventSource {
func NewK8SEventsEventSource(kubeclientset kubernetes.Interface, bindNamespace string, bindServiceAccountName string, image string) sources.EventSource {
glog.Infof("Image: %q", image)
return &K8SEventsEventSource{kubeclientset: kubeclientset, image: image}
return &K8SEventsEventSource{kubeclientset: kubeclientset, bindNamespace: bindNamespace, bindServiceAccountName: bindServiceAccountName, image: image}
}

func (t *K8SEventsEventSource) Unbind(trigger sources.EventTrigger, bindContext sources.BindContext) error {
glog.Infof("Unbinding K8S Events with context %+v", bindContext)

deploymentName := bindContext.Context[deployment].(string)

err := t.deleteWatcher("knative-eventing-system", deploymentName)
err := t.deleteWatcher(deploymentName)
if err != nil {
glog.Warningf("Failed to delete deployment: %s", err)
return err
Expand All @@ -77,7 +82,7 @@ func (t *K8SEventsEventSource) Bind(trigger sources.EventTrigger, route string)

// Create actual watcher
deploymentName := subscriptionName
err = t.createWatcher("knative-eventing-system", deploymentName, t.image, namespace, route)
err = t.createWatcher(deploymentName, t.image, namespace, route)
if err != nil {
glog.Infof("Failed to create deployment: %v", err)
return nil, err
Expand All @@ -91,8 +96,8 @@ func (t *K8SEventsEventSource) Bind(trigger sources.EventTrigger, route string)

}

func (t *K8SEventsEventSource) createWatcher(namespace string, deploymentName string, image string, eventNamespace string, route string) error {
dc := t.kubeclientset.AppsV1().Deployments(namespace)
func (t *K8SEventsEventSource) createWatcher(deploymentName string, image string, eventNamespace string, route string) error {
dc := t.kubeclientset.AppsV1().Deployments(t.bindNamespace)

// First, check if deployment exists already.
if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil {
Expand All @@ -108,13 +113,13 @@ func (t *K8SEventsEventSource) createWatcher(namespace string, deploymentName st

// TODO: Create ownerref to the binding so when the binding goes away deployment
// gets removed. Currently we manually delete the deployment.
deployment := MakeWatcherDeployment(namespace, deploymentName, "bind-controller", image, eventNamespace, route)
deployment := MakeWatcherDeployment(t.bindNamespace, deploymentName, t.bindServiceAccountName, image, eventNamespace, route)
_, createErr := dc.Create(deployment)
return createErr
}

func (t *K8SEventsEventSource) deleteWatcher(namespace string, deploymentName string) error {
dc := t.kubeclientset.AppsV1().Deployments(namespace)
func (t *K8SEventsEventSource) deleteWatcher(deploymentName string) error {
dc := t.kubeclientset.AppsV1().Deployments(t.bindNamespace)

// First, check if deployment exists already.
if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil {
Expand All @@ -138,6 +143,9 @@ func main() {

decodedParameters, _ := base64.StdEncoding.DecodeString(os.Getenv(sources.EventSourceParametersKey))

bindNamespace := os.Getenv(sources.BindNamespaceKey)
bindServiceAccountName := os.Getenv(sources.BindServiceAccountKey)

var p parameters
err := json.Unmarshal(decodedParameters, &p)
if err != nil {
Expand All @@ -154,6 +162,6 @@ func main() {
glog.Fatalf("Error building kubernetes clientset: %v", err.Error())
}

sources.RunEventSource(NewK8SEventsEventSource(kubeClient, p.Image))
sources.RunEventSource(NewK8SEventsEventSource(kubeClient, bindNamespace, bindServiceAccountName, p.Image))
log.Printf("Done...")
}
7 changes: 7 additions & 0 deletions pkg/sources/k8sevents/watcher_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
sidecarIstioInjectAnnotation = "sidecar.istio.io/inject"
)

// MakeWatcherDeployment creates a deployment for a watcher.
// TODO: a whole bunch...
func MakeWatcherDeployment(namespace string, deploymentName string, serviceAccount string, image string, eventNamespace string, route string) *appsv1.Deployment {
Expand All @@ -41,6 +45,9 @@ func MakeWatcherDeployment(namespace string, deploymentName string, serviceAccou
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
// Inject Istio so any connection made from the receive adapter
// goes through and is enforced by Istio.
Annotations: map[string]string{sidecarIstioInjectAnnotation: "true"},
},
Spec: corev1.PodSpec{
ServiceAccountName: serviceAccount,
Expand Down
35 changes: 29 additions & 6 deletions sample/gcp_pubsub_function/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

A simple function that receives Google Cloud Pub Sub events and prints out the data field after decoding
from base64 encoding. Because we do **not** have an in-cluster event delivery mechanism yet, uses a
Knative route as an endpoint.
Knative route as an endpoint. This is also example of where we make use of a Receive Adapter
that runs in the context of the namespace where the binding is created. Since we wanted to
demonstrate pull events, we create a deployment that attaches to the specified GCP topic and
then forwards them to the destination.

## Prerequisites

Expand Down Expand Up @@ -36,6 +39,22 @@ gcloud pubsub topics create knative-demo

5. Install the event sources and types for [gcp_pubsub](../gcp_pubsub/README.md)


## Creating a Service Account
Because the Receive Adapter needs to run a deployment, you need to specify what
Service Account should be used in the target namespace for running the Receive Adapter.
Bind.Spec has a field that allows you to specify this. By default it uses "default" for
binding which typically has no priviledges, but this binding requires standing up a
deployment, so you need to either use an existing Service Account with appropriate
priviledges or create a new one. This example creates a Service Account and grants
it cluster admin access, and you probably wouldn't want to do that in production
settings, but for this example it will suffice just fine.

```shell
ko apply -f sample/gcp_pubsub_function/serviceaccount.yaml
ko apply -f sample/gcp_pubsub_function/serviceaccountbinding.yaml
```

## Running

You can deploy this to Knative from the root directory via:
Expand Down Expand Up @@ -79,12 +98,15 @@ gcp-pubsub-function.default.aikas.org pointing to 130.211.116.160

So, you'd need to create an A record for gcp-pubsub-function.default.aikas.org pointing to 130.211.116.160

To now bind the gcp_pubsub_function for GCP PubSub messages with the function we created above, you need to
create a Bind object. Modify sample/gcp_pubsub_function/bind.yaml to specify the topic and project id
you want.
To now bind the gcp_pubsub_function for GCP PubSub messages with the function we created above, you need
to create a Bind object. Modify sample/gcp_pubsub_function/bind.yaml to specify the topic and project id
you want.
For example, if I wanted to receive notifications to:
project: quantum-reducer-434 topic: knative-demo, my Bind object would look like the one below.

You can also specify a different Service Account to use for the bind / receive watcher by changing
the spec.serviceAccountName to something else.

For example, if I wanted to receive notifications to:
project: quantum-reducer-434 topic: knative-demo, my Bind object would look like so:

```yaml
apiVersion: feeds.knative.dev/v1alpha1
Expand All @@ -93,6 +115,7 @@ metadata:
name: gcppubsub-example
namespace: default
spec:
serviceAccountName: binder
trigger:
service: gcppubsub
eventType: receive
Expand Down
Loading