Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
bazel-*
.idea
.vscode
9 changes: 9 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ k8s_object(
template = "controller.yaml",
)

k8s_object(
name = "delivery",
images = {
"event-delivery:latest": "//cmd/delivery:image",
},
template = "delivery.yaml",
)

k8s_object(
name = "namespace",
template = "namespace.yaml",
Expand Down Expand Up @@ -66,5 +74,6 @@ k8s_objects(
":eventtype",
":eventsource",
":controller",
":delivery",
],
)
38 changes: 38 additions & 0 deletions cmd/delivery/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_binary", "go_library")

go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "github.com/elafros/eventing/cmd/delivery",
visibility = ["//visibility:private"],
deps = [
"//pkg/client/clientset/versioned:go_default_library",
"//pkg/client/informers/externalversions:go_default_library",
"//pkg/delivery:go_default_library",
"//pkg/delivery/action:go_default_library",
"//pkg/delivery/queue:go_default_library",
"//pkg/signals:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/plugin/pkg/client/auth/gcp:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
],
)

go_binary(
name = "delivery",
embed = [":go_default_library"],
importpath = "github.com/elafros/eventing/cmd/delivery",
pure = "on",
)

load("@io_bazel_rules_docker//go:image.bzl", "go_image")

go_image(
name = "image",
binary = ":delivery",
)
139 changes: 139 additions & 0 deletions cmd/delivery/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright 2018 Google, Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Implements a process that wraps an event-receiving webhook, an event
// queue, and a sender which delivers events to their eventual destination.
package main

import (
"context"
"flag"
"net/http"
"time"

"github.com/golang/glog"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

clientset "github.com/elafros/eventing/pkg/client/clientset/versioned"
informers "github.com/elafros/eventing/pkg/client/informers/externalversions"
"github.com/elafros/eventing/pkg/delivery"
"github.com/elafros/eventing/pkg/delivery/action"
"github.com/elafros/eventing/pkg/delivery/queue"
"github.com/elafros/eventing/pkg/signals"

"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
eventBufferSize = 100
senderThreads = 10
serverAddress = ":9090"
metricsScrapePath = "/metrics"
debugQueuePath = "/queue/"
sendEventPrefix = "/v1alpha1/"
)

var (
masterURL string
kubeconfig string
)

func main() {
defer glog.Flush()
flag.Parse()

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeconfig: %s", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

client, err := clientset.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building clientset: %s", err.Error())
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
bindLister := informerFactory.Eventing().V1alpha1().Binds().Lister()

q := queue.NewInMemoryQueue(eventBufferSize)
receiver := delivery.NewReceiver(bindLister, q)

processors := map[string]action.Action{
action.LoggingActionType: action.NewLoggingAction(),
action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient),
}
sender := delivery.NewSender(q, processors)

go kubeInformerFactory.Start(stopCh)
go informerFactory.Start(stopCh)

// Start sender:
go func() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the sender need to be wrapped in a goroutine, given that sender.Run just runs a bunch of goroutines and returns immediately?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. It should have been blocking like the bind controller. Have fixed.

// We don't expect this to return until stop is called,
// but if it does, propagate it back.
glog.Info("Staring event sender")
if err := sender.Run(senderThreads, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Suggest:
glog.Fatalf("Error running controller: %v", err)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never "%v" an error. "%s" has an overload for the error interface to call err.Error() whereas "%v" will inspect the structure (and fails to follow pointers)

}
}()

// Set up HTTP endpoints
mux := http.NewServeMux()
glog.Infof("Set up metrics scrape path %s", metricsScrapePath)
mux.Handle(metricsScrapePath, promhttp.Handler())
glog.Infof("Set up debug queue path %s", debugQueuePath)
mux.Handle(debugQueuePath, queue.NewDiagnosticHandler(q))
glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix)
mux.Handle(sendEventPrefix, receiver)
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
glog.Infof("Unhandled %s to route %q", req.Method, req.URL.Path)
http.NotFound(w, req)
})

// Start the API server
srv := &http.Server{Addr: serverAddress, Handler: mux}
go func() {
glog.Infof("Starting API server at %s", serverAddress)
if err := srv.ListenAndServe(); err != nil {
glog.Infof("Httpserver: ListenAndServe() finished with error: %s", err)
}
}()

<-stopCh

// Close the http server gracefully
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv.Shutdown(ctx)
}

func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
34 changes: 34 additions & 0 deletions delivery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2018 Google, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: event-delivery
namespace: bind-system
spec:
replicas: 1
template:
metadata:
labels:
app: event-delivery
spec:
# TODO(inlined): create new narrower role
serviceAccountName: bind-controller
containers:
- name: event-delivery
image: event-delivery:latest
args: [
"-logtostderr",
"-stderrthreshold", "INFO",
]
1 change: 1 addition & 0 deletions pkg/apis/bind/v1alpha1/bind_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type BindAction struct {
RouteName string `json:"routeName,omitempty"`
}

// EventTrigger describes when an Event should be delivered.
type EventTrigger struct {
// Required. The type of event to observe. For example:
// `google.storage.object.finalize` and
Expand Down
43 changes: 43 additions & 0 deletions pkg/delivery/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = [
"doc.go",
"receiver.go",
"sender.go",
],
importpath = "github.com/elafros/eventing/pkg/delivery",
deps = [
"//pkg/apis/bind/v1alpha1:go_default_library",
"//pkg/client/listers/bind/v1alpha1:go_default_library",
"//pkg/delivery/action:go_default_library",
"//pkg/delivery/queue:go_default_library",
"//pkg/event:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)

go_test(
name = "go_default_xtest",
srcs = [
"receiver_test.go",
"sender_test.go",
],
importpath = "github.com/elafros/eventing/pkg/delivery_test",
deps = [
":go_default_library",
"//pkg/apis/bind/v1alpha1:go_default_library",
"//pkg/client/clientset/versioned/fake:go_default_library",
"//pkg/client/informers/externalversions:go_default_library",
"//pkg/delivery/action:go_default_library",
"//pkg/delivery/queue:go_default_library",
"//pkg/event:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)
20 changes: 20 additions & 0 deletions pkg/delivery/action/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = [
"action.go",
"doc.go",
"elafros.go",
"logging.go",
],
importpath = "github.com/elafros/eventing/pkg/delivery/action",
deps = [
"//pkg/event:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add unit tests in this package.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel confident that I know enough of Elafros to write a good unit test except one that literally mocks & exercises my code. The Logging action isn't really testable unless glog has hooks.

Can look at the right way to add tests after Copenhagen

43 changes: 43 additions & 0 deletions pkg/delivery/action/action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2018 Google, Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package action
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding package comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see doc.go)


import (
"github.com/elafros/eventing/pkg/event"
)

// The Action interface implements the capability to send events to an action of
// a particular category (e.g. Elafros routes, WebHooks, ETLs)
// Random thought: optional additional interface for SendEventAsync. We've wanted
// this in the past to allow infrastructure optimizations.
type Action interface {
// SendEvent delivers an event to the named Action synchronously.
// Should return the response from the action (for possible continuation)
// or an error.
// Returns an interface{} result which is currently unused, but will be used to
// allow continuations in the future.
// TODO: May need interfaces to indicate whether errors are fatal or retryable.
SendEvent(name string, data interface{}, context *event.Context) (interface{}, error)
}

// ActionFunc allows simple Action types to be implemented as a standalone function.
type ActionFunc func(name string, data interface{}, context *event.Context) (interface{}, error)

// SendEvent implements the Action interface for a function with the same signature as Action.SendEvent.
func (a ActionFunc) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) {
return a(name, data, context)
}
Copy link
Copy Markdown
Contributor

@eobrain eobrain Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the ActionFunc needed? Do they belong with all the abstract stuff, or should they be in a separate func.go as peers with the elafros.go and logging.go?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm modeling events off of HTTP libraries as much as possible. Action is to Handler as ActionFunc is to HandlerFunc.

19 changes: 19 additions & 0 deletions pkg/delivery/action/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright 2018 Google, Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package action implements the various Actions that can be invoked
// in response to an Event.
package action
Loading