Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
bazel-*
.idea
.vscode
9 changes: 9 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ k8s_object(
template = "controller.yaml",
)

k8s_object(
name = "delivery",
images = {
"event-delivery:latest": "//cmd/delivery:image",
},
template = "delivery.yaml",
)

k8s_object(
name = "namespace",
template = "namespace.yaml",
Expand Down Expand Up @@ -66,5 +74,6 @@ k8s_objects(
":eventtype",
":eventsource",
":controller",
":delivery",
],
)
38 changes: 38 additions & 0 deletions cmd/delivery/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_binary", "go_library")

go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "github.com/elafros/eventing/cmd/delivery",
visibility = ["//visibility:private"],
deps = [
"//pkg/client/clientset/versioned:go_default_library",
"//pkg/client/informers/externalversions:go_default_library",
"//pkg/delivery:go_default_library",
"//pkg/delivery/action:go_default_library",
"//pkg/delivery/queue:go_default_library",
"//pkg/signals:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/plugin/pkg/client/auth/gcp:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
],
)

go_binary(
name = "delivery",
embed = [":go_default_library"],
importpath = "github.com/elafros/eventing/cmd/delivery",
pure = "on",
)

load("@io_bazel_rules_docker//go:image.bzl", "go_image")

go_image(
name = "image",
binary = ":delivery",
)
140 changes: 140 additions & 0 deletions cmd/delivery/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
Copyright 2018 Google, Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Implements a process that wraps an event-receiving webhook, an event
// queue, and a sender which delivers events to their eventual destination.
package main

import (
"context"
"flag"
"net/http"
"time"

"github.com/golang/glog"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

clientset "github.com/elafros/eventing/pkg/client/clientset/versioned"
informers "github.com/elafros/eventing/pkg/client/informers/externalversions"
"github.com/elafros/eventing/pkg/delivery"
"github.com/elafros/eventing/pkg/delivery/action"
"github.com/elafros/eventing/pkg/delivery/queue"
"github.com/elafros/eventing/pkg/signals"

"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
eventBufferSize = 100
senderThreads = 10
serverAddress = ":9090"
metricsScrapePath = "/metrics"
debugQueuePath = "/queue/"
sendEventPrefix = "/v1alpha1/"
)

var (
masterURL string
kubeconfig string
)

func main() {
defer glog.Flush()
flag.Parse()

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeconfig: %s", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

client, err := clientset.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building clientset: %s", err.Error())
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
bindLister := informerFactory.Eventing().V1alpha1().Binds().Lister()

q := queue.NewInMemoryQueue(eventBufferSize)
receiver := delivery.NewReceiver(bindLister, q)

processors := map[string]action.Action{
action.LoggingActionType: action.NewLoggingAction(),
action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient),
action.ServiceActionType: action.NewServiceAction(http.DefaultClient),
}
sender := delivery.NewSender(q, processors)

go kubeInformerFactory.Start(stopCh)
go informerFactory.Start(stopCh)

// Start sender:
go func() {
// We don't expect this to return until stop is called,
// but if it does, propagate it back.
glog.Info("Staring event sender")
if err := sender.Run(senderThreads, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
}()

// Set up HTTP endpoints
mux := http.NewServeMux()
glog.Infof("Set up metrics scrape path %s", metricsScrapePath)
mux.Handle(metricsScrapePath, promhttp.Handler())
glog.Infof("Set up debug queue path %s", debugQueuePath)
mux.Handle(debugQueuePath, queue.NewDiagnosticHandler(q))
glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix)
mux.Handle(sendEventPrefix, receiver)
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
glog.Infof("Unhandled %s to route %q", req.Method, req.URL.Path)
http.NotFound(w, req)
})

// Start the API server
srv := &http.Server{Addr: serverAddress, Handler: mux}
go func() {
glog.Infof("Starting API server at %s", serverAddress)
if err := srv.ListenAndServe(); err != nil {
glog.Infof("Httpserver: ListenAndServe() finished with error: %s", err)
}
}()

<-stopCh

// Close the http server gracefully
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv.Shutdown(ctx)
}

func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
104 changes: 104 additions & 0 deletions cmd/sendevent/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2018 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Implements a simple utility for sending a JSON-encoded sample event.
// Not wired to bazel because this utility is meant to be run directly rather
// than deployed to K8S
package main

import (
"crypto/rand"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"time"

"github.com/elafros/eventing/pkg/event"
)

var (
context event.Context
webhook string
data string
)

func init() {
flag.StringVar(&context.EventID, "event-id", "", "Event ID to use. Defaults to a UUID")
flag.StringVar(&context.EventType, "event-type", "google.events.action.demo", "The Event Type to use.")
flag.StringVar(&context.Source, "source", "", "Source URI to use. Defaults to the current machine's hostname")
flag.StringVar(&data, "data", `{"hello": "world!"}`, "Event data")
}

func main() {
flag.Parse()

if flag.NArg() != 1 {
fmt.Println("Missing webhook address")
os.Exit(1)
}

webhook := flag.Args()[0]

var untyped map[string]interface{}
if err := json.Unmarshal([]byte(data), &untyped); err != nil {
fmt.Println("Currenlty sendevent only supports JSON event data")
os.Exit(1)
}

fillEventContext(&context)
req, err := event.NewRequest(webhook, untyped, context)
if err != nil {
fmt.Printf("Failed to create request: %s", err)
os.Exit(1)
}

res, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Printf("Failed to send event to %s: %s\n", webhook, err)
os.Exit(1)
}
fmt.Printf("Got response from %s\n%s\n", webhook, res.Status)
if res.Header.Get("Content-Length") != "" {
bytes, _ := ioutil.ReadAll(res.Body)
fmt.Println(string(bytes))
}
}

func fillEventContext(ctx *event.Context) {
ctx.CloudEventsVersion = "0.1"
ctx.EventTime = time.Now().UTC()

if ctx.EventID == "" {
// A "UUID". Not technically spec complaint
b := make([]byte, 16)
if n, err := rand.Read(b); n != 16 || err != nil {
fmt.Println("Could not create event-id UUID")
os.Exit(1)
}
ctx.EventID = fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}

if ctx.Source == "" {
var err error
ctx.Source, err = os.Hostname()
if err != nil {
ctx.Source = "localhost"
}
}
}
1 change: 0 additions & 1 deletion controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ spec:
labels:
app: bind-controller
spec:
serviceAccountName: bind-controller
containers:
- name: bind-controller
image: bind-controller:latest
Expand Down
34 changes: 34 additions & 0 deletions delivery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2018 Google, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: event-delivery
namespace: bind-system
spec:
replicas: 1
template:
metadata:
labels:
app: event-delivery
spec:
# TODO(inlined): create new narrower role
serviceAccountName: bind-controller
containers:
- name: event-delivery
image: event-delivery:latest
args: [
"-logtostderr",
"-stderrthreshold", "INFO",
]
22 changes: 19 additions & 3 deletions pkg/apis/bind/v1alpha1/bind_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ type Bind struct {
Status BindStatus `json:"status"`
}

// BindAction describes where events should be delivered.
type BindAction struct {
// RouteName specifies Elafros route as a target.
RouteName string `json:"routeName,omitempty"`
// Processor dictates the kind of service that will handle the Event.
// For example "elafros.dev/Route"
Processor string `json:"processor"`

// Name dicates the resource exposed by Processor that will handle the event.
// The semantics of Name is determined by Processor.
Name string `json:"name"`
}

// EventTrigger describes when an Event should be delivered.
type EventTrigger struct {
// Required. The type of event to observe. For example:
// `google.storage.object.finalize` and
Expand Down Expand Up @@ -118,13 +125,22 @@ type EventTrigger struct {
ParametersFrom []ParametersFromSource `json:"parametersFrom,omitempty"`
}

// Zero determines whether an EventTrigger is fully empty. If a Bind is set up
// without an EventTrigger, it is assumed that the real event source is incompatible
// with our event framework's control plane and will be set up manually through
// side channels.
func (t *EventTrigger) Zero() bool {
return t.EventType == "" && t.Resource == "" && t.Service == "" &&
t.Parameters == nil && t.ParametersFrom == nil
}

// BindSpec is the spec for a Bind resource
type BindSpec struct {
// Action specifies the target handler for the events
Action BindAction `json:"action"`

// Trigger specifies the trigger we're binding to
Trigger EventTrigger `json:trigger"`
Trigger EventTrigger `json:"trigger"`
}

// ParametersFromSource represents the source of a set of Parameters
Expand Down
Loading