Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pkg/sources/gcppubsub/gcp_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"log"
"os"

"github.com/knative/eventing/pkg/sources"
"github.com/golang/glog"
"github.com/google/uuid"
"github.com/knative/eventing/pkg/sources"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -45,9 +45,6 @@ const (
subscription = "subscription"
)

// TODO: This and the github example need to move out of proc so they can be invoked
// either with a webhook or by invoking a container. Regardless, that's a bigger
// refactor and hence a followup.
type GCPPubSubEventSource struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
Expand Down
25 changes: 25 additions & 0 deletions pkg/sources/k8sevents/eventsource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: feeds.knative.dev/v1alpha1
kind: EventSource
metadata:
name: k8sevents
namespace: default
spec:
type: k8sevents
source: k8sevents
image: github.com/knative/eventing/pkg/sources/k8sevents
parameters:
image: github.com/knative/eventing/pkg/sources/k8sevents/receive_adapter
22 changes: 22 additions & 0 deletions pkg/sources/k8sevents/eventtype.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: feeds.knative.dev/v1alpha1
kind: EventType
metadata:
name: receiveevent
namespace: default
spec:
eventSource: k8sevents
description: "subscription for receiving k8s cluster events"
159 changes: 159 additions & 0 deletions pkg/sources/k8sevents/k8s_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright 2018 Google, Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"log"
"os"

"github.com/golang/glog"
"github.com/google/uuid"
"github.com/knative/eventing/pkg/sources"

apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

const (
projectIDKey = "projectID"
deployment = "deployment"
subscription = "subscription"
)

type K8SEventsEventSource struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
image string
}

func NewK8SEventsEventSource(kubeclientset kubernetes.Interface, image string) sources.EventSource {
glog.Infof("Image: %q", image)
return &K8SEventsEventSource{kubeclientset: kubeclientset, image: image}
}

func (t *K8SEventsEventSource) Unbind(trigger sources.EventTrigger, bindContext sources.BindContext) error {
glog.Infof("Unbinding K8S Events with context %+v", bindContext)

deploymentName := bindContext.Context[deployment].(string)

err := t.deleteWatcher("knative-eventing-system", deploymentName)
if err != nil {
glog.Warningf("Failed to delete deployment: %s", err)
return err
}
return nil
}

func (t *K8SEventsEventSource) Bind(trigger sources.EventTrigger, route string) (*sources.BindContext, error) {
glog.Infof("CREATING K8S Event binding")

namespace := trigger.Parameters["namespace"].(string)

// Just generate a random UUID as a subscriptionName
uuid, err := uuid.NewRandom()
subscriptionName := fmt.Sprintf("sub-%s", uuid.String())

glog.Infof("Namespace: %q Route: %s", namespace, route)

// Create actual watcher
deploymentName := subscriptionName
err = t.createWatcher("knative-eventing-system", deploymentName, t.image, namespace, route)
if err != nil {
glog.Infof("Failed to create deployment: %v", err)
return nil, err
}

return &sources.BindContext{
Context: map[string]interface{}{
subscription: subscriptionName,
deployment: deploymentName,
}}, nil

}

func (t *K8SEventsEventSource) createWatcher(namespace string, deploymentName string, image string, eventNamespace string, route string) error {
dc := t.kubeclientset.AppsV1().Deployments(namespace)

// First, check if deployment exists already.
if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil {
if !apierrs.IsNotFound(err) {
glog.Infof("deployments.Get for %q failed: %v", deploymentName, err)
return err
}
glog.Infof("Deployment %q doesn't exist, creating", deploymentName)
} else {
glog.Infof("Found existing deployment %q", deploymentName)
return nil
}

// TODO: Create ownerref to the binding so when the binding goes away deployment
// gets removed. Currently we manually delete the deployment.
deployment := MakeWatcherDeployment(namespace, deploymentName, "bind-controller", image, eventNamespace, route)
_, createErr := dc.Create(deployment)
return createErr
}

func (t *K8SEventsEventSource) deleteWatcher(namespace string, deploymentName string) error {
dc := t.kubeclientset.AppsV1().Deployments(namespace)

// First, check if deployment exists already.
if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil {
if !apierrs.IsNotFound(err) {
glog.Infof("deployments.Get for %q failed: %v", deploymentName, err)
return err
}
glog.Infof("Deployment %q already deleted", deploymentName)
return nil
}

return dc.Delete(deploymentName, &metav1.DeleteOptions{})
}

type parameters struct {
Image string `json:"image,omitempty"`
}

func main() {
flag.Parse()

decodedParameters, _ := base64.StdEncoding.DecodeString(os.Getenv(sources.EventSourceParametersKey))

var p parameters
err := json.Unmarshal(decodedParameters, &p)
if err != nil {
panic(fmt.Sprintf("can not unmarshal %q : %v", decodedParameters, err))
}

cfg, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
glog.Fatalf("Error building kubeconfig: %v", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %v", err.Error())
}

sources.RunEventSource(NewK8SEventsEventSource(kubeClient, p.Image))
log.Printf("Done...")
}
134 changes: 134 additions & 0 deletions pkg/sources/k8sevents/receive_adapter/receive_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2018 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"

"github.com/golang/glog"
"github.com/knative/eventing/pkg/signals"
corev1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)

const (
// Target for messages
envTarget = "TARGET"
// Namespace to watch
envNamespace = "NAMESPACE"
)

type EventWatcher struct {
target string
}

func NewEventWatcher(target string) *EventWatcher {
return &EventWatcher{target: target}
}

func (e *EventWatcher) updateEvent(old, new interface{}) {
e.addEvent(new)
}

func (e *EventWatcher) addEvent(new interface{}) {
event := new.(*corev1.Event)
log.Printf("GOT EVENT: %+v", event)
postMessage(e.target, event)
}

func main() {
flag.Parse()

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

target := os.Getenv(envTarget)
namespace := os.Getenv(envNamespace)

log.Printf("Target is: %q", target)
log.Printf("Namespace is: %q", namespace)

cfg, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
log.Fatalf("Error building kubernetes clientset: %v", err.Error())
}

log.Printf("Creating a new Event Watcher...")
watcher := NewEventWatcher(target)

eventsInformer := coreinformers.NewFilteredEventInformer(
kubeClient, namespace, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil)

eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: watcher.addEvent,
UpdateFunc: watcher.updateEvent,
})

log.Printf("Starting eventsInformer...")
go eventsInformer.Run(stopCh)

log.Printf("Waiting for caches to sync...")
if ok := cache.WaitForCacheSync(stopCh, eventsInformer.HasSynced); !ok {
glog.Fatalf("Failed to wait for events cache to sync")
}
log.Printf("Caches synced...")
<-stopCh
log.Printf("Exiting...")
}

func postMessage(target string, m *corev1.Event) error {
jsonStr, err := json.Marshal(m)
if err != nil {
log.Printf("Failed to marshal the message: %+v : %s", m, err)
return err
}

URL := fmt.Sprintf("http://%s/", target)
log.Printf("Posting to %q", URL)
req, err := http.NewRequest("POST", URL, bytes.NewBuffer(jsonStr))
if err != nil {
log.Printf("Failed to create http request: %s", err)
return err
}

req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
log.Printf("response Status: %s", resp.Status)
body, _ := ioutil.ReadAll(resp.Body)
log.Printf("response Body: %s", string(body))
return nil
}
Loading