From 78b5d8414d715f8e7175a443c881d24c234b3fd4 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Tue, 27 Mar 2018 16:19:26 -0700 Subject: [PATCH 01/14] Refactor Bind to contain an EventTrigger field. * Changes to "triggers": * Renamed the "triggers" pacakge to "sources" to avoid confusion. * EventTriggers (FKA Sources) now take EventTriggers instead of just params. * source.EventTrigger is different from v1alpha1.EventTrigger because the latter has embedded Raw types that would be hard to use. * Updated GitHub sample: * Renamed the EventSource to "github.com" rather than just "github" to better align with GCF. I think this revealed something about the control plane model that I want to discuss. * Updated pullrequest.yaml to match new format. * As part of this I made a minor change to update-deps to work on mac (gnu-sed still required). --- pkg/apis/bind/v1alpha1/bind_types.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/apis/bind/v1alpha1/bind_types.go b/pkg/apis/bind/v1alpha1/bind_types.go index 9b9899f5e65..6267e02d954 100644 --- a/pkg/apis/bind/v1alpha1/bind_types.go +++ b/pkg/apis/bind/v1alpha1/bind_types.go @@ -40,6 +40,7 @@ type BindAction struct { RouteName string `json:"routeName,omitempty"` } +// EventTrigger describes when an Event should be delivered. type EventTrigger struct { // Required. The type of event to observe. For example: // `google.storage.object.finalize` and From b7b8ff90224075419312b27735cd77c125da3a32 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 16 Apr 2018 15:28:31 -0700 Subject: [PATCH 02/14] First pass at delivery service --- .gitignore | 2 + BUILD | 9 ++ cmd/delivery/BUILD.bazel | 41 +++++++ cmd/delivery/main.go | 139 ++++++++++++++++++++++++ delivery.yaml | 34 ++++++ pkg/delivery/BUILD.bazel | 41 +++++++ pkg/delivery/action/BUILD.bazel | 19 ++++ pkg/delivery/action/action.go | 41 +++++++ pkg/delivery/action/elafros.go | 72 +++++++++++++ pkg/delivery/action/logging.go | 52 +++++++++ pkg/delivery/queue/BUILD.bazel | 19 ++++ pkg/delivery/queue/diagnostics.go | 59 ++++++++++ pkg/delivery/queue/memory_queue.go | 60 +++++++++++ pkg/delivery/queue/queue.go | 42 ++++++++ pkg/delivery/receiver.go | 112 +++++++++++++++++++ pkg/delivery/receiver_test.go | 166 +++++++++++++++++++++++++++++ pkg/delivery/sender.go | 83 +++++++++++++++ pkg/delivery/sender_test.go | 100 +++++++++++++++++ 18 files changed, 1091 insertions(+) create mode 100644 cmd/delivery/BUILD.bazel create mode 100644 cmd/delivery/main.go create mode 100644 delivery.yaml create mode 100644 pkg/delivery/BUILD.bazel create mode 100644 pkg/delivery/action/BUILD.bazel create mode 100644 pkg/delivery/action/action.go create mode 100644 pkg/delivery/action/elafros.go create mode 100644 pkg/delivery/action/logging.go create mode 100644 pkg/delivery/queue/BUILD.bazel create mode 100644 pkg/delivery/queue/diagnostics.go create mode 100644 pkg/delivery/queue/memory_queue.go create mode 100644 pkg/delivery/queue/queue.go create mode 100644 pkg/delivery/receiver.go create mode 100644 pkg/delivery/receiver_test.go create mode 100644 pkg/delivery/sender.go create mode 100644 pkg/delivery/sender_test.go diff --git a/.gitignore b/.gitignore index ac51a054d2d..7f5afbb95a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ bazel-* +.idea +.vscode diff --git a/BUILD b/BUILD index 2f429daadff..8dfa98d9d0e 100644 --- a/BUILD +++ b/BUILD @@ -17,6 +17,14 @@ k8s_object( template = "controller.yaml", ) +k8s_object( + name = "delivery", + images = { + "event-delivery:latest": "//cmd/delivery:image", + }, + template = "delivery.yaml", +) + k8s_object( name = "namespace", template = "namespace.yaml", @@ -66,5 +74,6 @@ k8s_objects( ":eventtype", ":eventsource", ":controller", + ":delivery", ], ) diff --git a/cmd/delivery/BUILD.bazel b/cmd/delivery/BUILD.bazel new file mode 100644 index 00000000000..2f9cdf00a76 --- /dev/null +++ b/cmd/delivery/BUILD.bazel @@ -0,0 +1,41 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importpath = "github.com/elafros/eventing/cmd/delivery", + visibility = ["//visibility:private"], + deps = [ + "//pkg/client/clientset/versioned:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/bind:go_default_library", + "//pkg/delivery:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/signals:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/plugin/pkg/client/auth/gcp:go_default_library", + "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", + ], +) + +go_binary( + name = "delivery", + embed = [":go_default_library"], + importpath = "github.com/elafros/eventing/cmd/delivery", + pure = "on", + visibility = ["//visibility:public"], +) + +load("@io_bazel_rules_docker//go:image.bzl", "go_image") + +go_image( + name = "image", + binary = ":delivery", +) diff --git a/cmd/delivery/main.go b/cmd/delivery/main.go new file mode 100644 index 00000000000..e9f16a9f101 --- /dev/null +++ b/cmd/delivery/main.go @@ -0,0 +1,139 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "net/http" + "time" + + "github.com/golang/glog" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + clientset "github.com/elafros/eventing/pkg/client/clientset/versioned" + informers "github.com/elafros/eventing/pkg/client/informers/externalversions" + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/signals" + + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + eventBufferSize = 100 + senderThreads = 10 + serverAddress = ":9090" + metricsScrapePath = "/metrics" + debugQueuePath = "/queue/" + sendEventPrefix = "/v1alpha1/" +) + +var ( + masterURL string + kubeconfig string +) + +func main() { + flag.Parse() + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + if err != nil { + glog.Fatalf("Error building kubeconfig: %s", err.Error()) + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + + client, err := clientset.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building clientset: %s", err.Error()) + } + + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) + informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) + bindLister := informerFactory.Eventing().V1alpha1().Binds().Lister() + + q := queue.NewInMemoryQueue(eventBufferSize) + receiver := delivery.NewReceiver(bindLister, q) + + processors := map[string]action.Action{ + action.LoggingActionType: action.NewLoggingAction(), + action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient), + } + sender := delivery.NewSender(q, processors) + + go kubeInformerFactory.Start(stopCh) + go informerFactory.Start(stopCh) + + // Start sender: + go func() { + // We don't expect this to return until stop is called, + // but if it does, propagate it back. + glog.Info("Staring event sender") + if err := sender.Run(senderThreads, stopCh); err != nil { + glog.Fatalf("Error running controller: %s", err.Error()) + } + }() + + // Set up HTTP endpoints + glog.Infof("Set up metrics scrape path %s", metricsScrapePath) + glog.Infof("Set up debug queue path %s", debugQueuePath) + glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) + + mux := http.NewServeMux() + mux.Handle(metricsScrapePath, promhttp.Handler()) + mux.Handle(debugQueuePath, queue.NewDiagnosticHandler(q)) + mux.Handle(sendEventPrefix, receiver) + mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + glog.Infof("Unhandled %s to route %q", req.Method, req.URL.Path) + http.NotFound(w, req) + }) + + // Start the API server + srv := &http.Server{Addr: serverAddress, Handler: mux} + go func() { + glog.Infof("Starting API server at %s", serverAddress) + if err := srv.ListenAndServe(); err != nil { + glog.Infof("Httpserver: ListenAndServe() finished with error: %s", err) + } + }() + + <-stopCh + + // Close the http server gracefully + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + srv.Shutdown(ctx) + + glog.Flush() +} + +func init() { + flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") +} diff --git a/delivery.yaml b/delivery.yaml new file mode 100644 index 00000000000..6003ecd9a3f --- /dev/null +++ b/delivery.yaml @@ -0,0 +1,34 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: event-delivery + namespace: bind-system +spec: + replicas: 1 + template: + metadata: + labels: + app: event-delivery + spec: + # TODO(inlined): create new narrower role + serviceAccountName: bind-controller + containers: + - name: event-delivery + image: event-delivery:latest + args: [ + "-logtostderr", + "-stderrthreshold", "INFO", + ] diff --git a/pkg/delivery/BUILD.bazel b/pkg/delivery/BUILD.bazel new file mode 100644 index 00000000000..b4f9e2c04e0 --- /dev/null +++ b/pkg/delivery/BUILD.bazel @@ -0,0 +1,41 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "receiver.go", + "sender.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery", + deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/client/listers/bind/v1alpha1:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "receiver_test.go", + "sender_test.go", + ], + deps = [ + ":go_default_library", + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/client/clientset/versioned/fake:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/event:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel new file mode 100644 index 00000000000..fb707651fdc --- /dev/null +++ b/pkg/delivery/action/BUILD.bazel @@ -0,0 +1,19 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "action.go", + "elafros.go", + "logging.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery/action", + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) diff --git a/pkg/delivery/action/action.go b/pkg/delivery/action/action.go new file mode 100644 index 00000000000..177664fd814 --- /dev/null +++ b/pkg/delivery/action/action.go @@ -0,0 +1,41 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "github.com/elafros/eventing/pkg/event" +) + +// The Action interface implements the capability to send events to an action of +// a particular category (e.g. Elafros routes, WebHooks, ETLs) +// Random thought: optional additional interface for SendEventAsync. We've wanted +// this in the past to allow infrastructure optimizations. +type Action interface { + // SendEvent delivers an event to the named Action synchronously. + // Should return the response from the action (for possible continuation) + // or an error + // TODO: May need interfaces to indicate whether errors are fatal or retryable. + SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) +} + +// ActionFunc allows simple Action types to be implemented as a standalone function. +type ActionFunc func(name string, data interface{}, context *event.Context) (interface{}, error) + +// SendEvent implements the Action interface for a function with the same signature as Action.SendEvent. +func (a ActionFunc) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + return a(name, data, context) +} diff --git a/pkg/delivery/action/elafros.go b/pkg/delivery/action/elafros.go new file mode 100644 index 00000000000..5224759f375 --- /dev/null +++ b/pkg/delivery/action/elafros.go @@ -0,0 +1,72 @@ +package action + +import ( + "fmt" + "net/http" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" + "k8s.io/client-go/kubernetes" +) + +const ( + // ElafrosActionType is the expected Bind Processor name to + // cause Events to be sent to an Elafros Route. + ElafrosActionType = "elafros.dev/Route" +) + +// ElafrosAction sends Events to an Elafros Route. +type ElafrosAction struct { + kubeclientset kubernetes.Interface + httpclient *http.Client +} + +// NewElafrosAction creates an Action object that sends Events to Elafros Routes. +func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) *ElafrosAction { + return &ElafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} +} + +// SendEvent will POST the event spec to the root URI of the elafros route. +func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + parts := strings.Split(name, "/") + if len(parts) != 2 { + return nil, fmt.Errorf("Expected elafros route '%s' to be in the form '/'", name) + } + route, namespace := parts[1], parts[0] + + domain, err := getDomainSuffixFromElaConfig(a.kubeclientset) + if err != nil { + glog.Error("Could not look up Elafros domain") + return nil, err + } + + addr := fmt.Sprintf("http://%s.%s.%s", route, namespace, domain) + req, err := event.NewRequest(addr, data, *context) + if err != nil { + return nil, err + } + + if _, err := a.httpclient.Do(req); err != nil { + return nil, err + } + + // TODO: non-200 responses as errors. Standard decoding of responses to be forwarded. + return nil, nil +} + +func getDomainSuffixFromElaConfig(cl kubernetes.Interface) (string, error) { + const name = "ela-config" + const namespace = "ela-system" + c, err := cl.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + domainSuffix, ok := c.Data["domainSuffix"] + if !ok { + return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) + } + return domainSuffix, nil +} diff --git a/pkg/delivery/action/logging.go b/pkg/delivery/action/logging.go new file mode 100644 index 00000000000..05f98c7c959 --- /dev/null +++ b/pkg/delivery/action/logging.go @@ -0,0 +1,52 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "encoding/json" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + // LoggingActionType is the expected Bind Processor type + // that will cause events to be logged and dropped. + LoggingActionType = "eventing.elafros.dev/EventLogger" +) + +// A LoggingAction will log and drop Events. +type LoggingAction struct{} + +// SendEvent implements Action.SendEvent +func (a LoggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + event := map[string]interface{}{ + "data": data, + "context": context, + } + b, err := json.Marshal(event) + if err != nil { + return nil, err + } + glog.Infof("[%s] sendEvent %s", name, string(b)) + return nil, nil +} + +// NewLoggingAction creates an Action that will log and drop Events. +func NewLoggingAction() LoggingAction { + return LoggingAction{} +} diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel new file mode 100644 index 00000000000..1336348d5e6 --- /dev/null +++ b/pkg/delivery/queue/BUILD.bazel @@ -0,0 +1,19 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "diagnostics.go", + "memory_queue.go", + "queue.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery/queue", + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) diff --git a/pkg/delivery/queue/diagnostics.go b/pkg/delivery/queue/diagnostics.go new file mode 100644 index 00000000000..af3b0517ca5 --- /dev/null +++ b/pkg/delivery/queue/diagnostics.go @@ -0,0 +1,59 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "encoding/json" + "net/http" + + "github.com/golang/glog" +) + +type diagnosticsHandler struct { + Queue + *http.ServeMux +} + +// NewDiagnosticHandler provides endpoints for inspecting +// and diagnsing queued events. +// Eventually we'll need to decide more about how we will shard our queues +// and what visibility we'll give to each. E.g. I suspect we'll support looking +// at the lenght of queues for a single Flow or Action. +func NewDiagnosticHandler(q Queue) http.Handler { + h := &diagnosticsHandler{Queue: q, ServeMux: http.NewServeMux()} + h.HandleFunc("/queue/length", h.GetLength) + h.HandleFunc("/queue/", func(w http.ResponseWriter, r *http.Request) { + glog.Infof("Do not know how to handle queue path", r.URL.Path) + http.NotFound(w, r) + }) + return h +} + +func (h *diagnosticsHandler) GetLength(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + glog.Warningf("Unauthorized method on /queue/length") + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + glog.V(4).Info("Reporting queue length") + b, err := json.Marshal(map[string]int{"length": h.Length()}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + w.Write(b) +} diff --git a/pkg/delivery/queue/memory_queue.go b/pkg/delivery/queue/memory_queue.go new file mode 100644 index 00000000000..f74f7d1e048 --- /dev/null +++ b/pkg/delivery/queue/memory_queue.go @@ -0,0 +1,60 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import "errors" + +// InMemoryQueue implements the queue interface with a memory buffer. +// Note: this isn't just a simple typedef for a queue because we will soon start +// experimenting with other features, such as fetching an event separate from acking, +// transactional ack + enqueue, cursors, etc. +type InMemoryQueue struct { + ch chan QueuedEvent +} + +// NewInMemoryQueue creates a new InMemoryQueue +func NewInMemoryQueue(length int) *InMemoryQueue { + ch := make(chan QueuedEvent, length) + return &InMemoryQueue{ + ch: ch, + } +} + +// Push implements Queue.Push +func (q *InMemoryQueue) Push(event QueuedEvent) error { + select { + case q.ch <- event: + return nil + default: + return errors.New("Event queue is out of memory") + } +} + +// Pull implements Queue.Pull +func (q *InMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { + select { + case event, ok := <-q.ch: + return event, ok + case <-stopCh: + return QueuedEvent{}, false + } +} + +// Length returns Queue.Length +func (q InMemoryQueue) Length() int { + return len(q.ch) +} diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go new file mode 100644 index 00000000000..d8db4d1142c --- /dev/null +++ b/pkg/delivery/queue/queue.go @@ -0,0 +1,42 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "github.com/elafros/eventing/pkg/event" +) + +// QueuedEvents are saved to an EventQueue before delivery +type QueuedEvent struct { + Action ActionType `json:"action"` + Data interface{} `json:"data"` + Context *event.Context `json:"context"` +} + +// TODO(vaikas): Remove this once Bind's Action has been migrated +// to be generic. +type ActionType struct { + Name string `json:"name"` + Processor string `json:"processor"` +} + +// Queue implements basic features to allow asynchronous buffering of events. +type Queue interface { + Push(event QueuedEvent) error + Pull(stopCh <-chan struct{}) (event QueuedEvent, ok bool) + Length() int +} diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go new file mode 100644 index 00000000000..f9ad8fd50a0 --- /dev/null +++ b/pkg/delivery/receiver.go @@ -0,0 +1,112 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery + +import ( + "fmt" + "net/http" + "regexp" + + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + + listers "github.com/elafros/eventing/pkg/client/listers/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" +) + +// If an event source knows the exact flow it is targeting it can bypass the work involved with +// processing event triggers. +var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) + +// TODO(vaikas): Remove this once Bind's Action has been migrated +// to be generic. +const alwaysUseProcessor = "eventing.elafros.dev/EventLogger" + +func actionFromBind(bind *v1alpha1.Bind) queue.ActionType { + return queue.ActionType{ + Name: bind.Spec.Action.RouteName, + Processor: alwaysUseProcessor, + } +} + +// Receiver manages the HTTP endpoints for receiving events as well as +// stats about the queue for processing events. +type Receiver struct { + eventQueue queue.Queue + bindsLister listers.BindLister +} + +// NewReceiver creates a new Reciever object to enqueue events. +func NewReceiver(bindsLister listers.BindLister, eventQueue queue.Queue) *Receiver { + return &Receiver{bindsLister: bindsLister, eventQueue: eventQueue} +} + +// SendEvent enqueues an event data and Context for delivery to a particular action. +func (r *Receiver) SendEvent(action queue.ActionType, data interface{}, context *event.Context) error { + return r.eventQueue.Push(queue.QueuedEvent{ + Action: action, + Data: data, + Context: context, + }) +} + +// ServeHTTP implements the external REST API that other services will use to send events. +func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + glog.V(3).Info("Cannot handle method", req.Method) + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + matches := directSendEventRegExp.FindStringSubmatch(req.URL.Path) + if len(matches) != 3 { + glog.V(3).Info("Cannot parse route", req.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + + bind, err := r.bindsLister.Binds(string(matches[1])).Get(string(matches[2])) + if err != nil { + if errors.IsNotFound(err) { + fmt.Printf("Could not find Bind %s in namespace %s\n", matches[2], matches[1]) + glog.V(3).Infof("Event sent to non-existant Bind %s in namespace %s", matches[2], matches[1]) + w.WriteHeader(http.StatusNotFound) + return + } + glog.Errorf("Unknown error fetching bind %s in namespace %s: %s", matches[2], matches[1], err) + w.WriteHeader(http.StatusInternalServerError) + return + } + var data interface{} + context, err := event.FromRequest(&data, req) + if err != nil { + glog.Error("Failed to unmarshal event ", err) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + if err := r.SendEvent(actionFromBind(bind), data, context); err != nil { + glog.Error("Failed to enqueue event", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/pkg/delivery/receiver_test.go b/pkg/delivery/receiver_test.go new file mode 100644 index 00000000000..c8280ca2f7b --- /dev/null +++ b/pkg/delivery/receiver_test.go @@ -0,0 +1,166 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery_test + +import ( + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/client/clientset/versioned/fake" + informers "github.com/elafros/eventing/pkg/client/informers/externalversions" + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" +) + +var ( + demoBind = &v1alpha1.Bind{ + TypeMeta: metav1.TypeMeta{ + Kind: "Bind", + APIVersion: "eventing.elafros.dev/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "demo", + Namespace: "default", + }, + Spec: v1alpha1.BindSpec{ + Trigger: v1alpha1.EventTrigger{ + EventType: "org.example.object.create", + Service: "example.org", + Resource: "{abc}/123", + }, + Action: v1alpha1.BindAction{ + RouteName: "vaikas.fix.this", + }, + }, + } +) + +func TestReceiverHttpErrors(t *testing.T) { + tests := []struct { + description string + method string + path string + headers http.Header + body interface{} + expectedStatus int + }{ + { + description: "bad method", + method: http.MethodGet, + expectedStatus: http.StatusMethodNotAllowed, + }, + { + description: "bad path", + method: http.MethodPost, + path: "/abc/123", + expectedStatus: http.StatusNotFound, + }, + { + description: "bad event", + method: http.MethodPost, + path: "/v1alpha1/namespaces/default/flows/demo:sendEvent", + // no headers, so we're missing all default fields + expectedStatus: http.StatusBadRequest, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + client := fake.NewSimpleClientset(demoBind) + factory := informers.NewSharedInformerFactory(client, 0).Eventing().V1alpha1().Binds() + factory.Informer().GetIndexer().Add(demoBind) + lister := factory.Lister() + + q := queue.NewInMemoryQueue(1) + r := delivery.NewReceiver(lister, q) + s := httptest.NewServer(r) + defer s.Close() + + url, err := url.Parse(s.URL + test.path) + if err != nil { + t.Fatalf("Unexpected error crafting test URL: %s", err) + } + res, err := s.Client().Do(&http.Request{ + Method: test.method, + URL: url, + }) + if err != nil { + t.Fatalf("Failed to reach test server: %s", err) + } + if res.StatusCode != test.expectedStatus { + t.Fatalf("Wrong status; expected=%d got=%d", test.expectedStatus, res.StatusCode) + } + }) + } +} + +func TestEnqueueEvent(t *testing.T) { + client := fake.NewSimpleClientset(demoBind) + factory := informers.NewSharedInformerFactory(client, 0).Eventing().V1alpha1().Binds() + factory.Informer().GetIndexer().Add(demoBind) + lister := factory.Lister() + + q := queue.NewInMemoryQueue(1) + r := delivery.NewReceiver(lister, q) + s := httptest.NewServer(r) + defer s.Close() + + data := map[string]interface{}{"hello": "world"} + context := &event.Context{ + CloudEventsVersion: "0.0.1-rc1", + EventID: "123abc", + EventTime: time.Now().UTC(), + EventType: "org.example.object.create", + Source: "https://example.org/examples/example1", + } + + path := s.URL + "/v1alpha1/namespaces/default/flows/demo:sendEvent" + req, err := event.NewRequest(path, data, *context) + if err != nil { + t.Fatalf("Failed to create request: %s", err) + } + + res, err := s.Client().Do(req) + if err != nil { + t.Fatalf("Unexpected error calling sendEvent: %s", err) + } + if res.StatusCode != http.StatusOK { + t.Fatalf("SendEvent request %+v failed with response %+v", req, res) + } + + if q.Length() != 1 { + t.Fatalf("Expected one queued event; got %d", q.Length()) + } + event, ok := q.Pull(nil) + if !ok { + t.Fatal("Failed to pull queued event") + } + if !reflect.DeepEqual(context, event.Context) { + t.Fatalf("Event context was not marshalled correctly; expected=%+v got=%+v", context, event.Context) + } + if !reflect.DeepEqual(data, event.Data) { + t.Fatalf("Event data was not marshalled correctly; expected=%+v got=%+v", data, event.Data) + } +} diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go new file mode 100644 index 00000000000..b4c330e4e7d --- /dev/null +++ b/pkg/delivery/sender.go @@ -0,0 +1,83 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery + +import ( + "fmt" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" +) + +// Sender delivers queued events to their target Actions. +type Sender struct { + eventQueue queue.Queue + actions map[string]action.Action +} + +// NewSender creates a new Sender object to deliver Events which were +// enqueued by a Receiver. +func NewSender( + eventQueue queue.Queue, + actions map[string]action.Action) *Sender { + return &Sender{ + eventQueue: eventQueue, + actions: actions, + } +} + +// RunOnce processes a single event from the queue. +// TODO: If the Queue was redesigned so that events are pulled with a lease, then we could restructure RunOnce to not +// need the stopCh (though does it really matter now anyway if the in-memory queue goes away?) +func (s *Sender) RunOnce(stopCh <-chan struct{}) bool { + // TODO(inlined): Pull should only take a lease on the queued event. The receiver should need + // to acknowledge the event and possibly transactionally enqueue subsequent events. + event, ok := s.eventQueue.Pull(stopCh) + if !ok { + glog.V(4).Info("RunOnce shutting down") + return false + } + + glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) + action, ok := s.actions[event.Action.Processor] + if !ok { + runtime.HandleError(fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor)) + return true + } + + // TODO(inlined): retry strategies for errors and continuations for success. + res, err := action.SendEvent(event.Action.Name, event.Data, event.Context) + if err != nil { + runtime.HandleError(fmt.Errorf("Action for event %s returned error: %s", event.Context.EventID, err)) + } else { + glog.V(4).Info("Event %s handled with response %+v", event.Context.EventID, res) + } + return true +} + +// Run runs Sender until stopCh is closed. +func (s *Sender) Run(threadiness int, stopCh <-chan struct{}) error { + for i := 0; i < threadiness; i++ { + go wait.Until(func() { s.RunOnce(stopCh) }, time.Second, stopCh) + } + return nil +} diff --git a/pkg/delivery/sender_test.go b/pkg/delivery/sender_test.go new file mode 100644 index 00000000000..f0ad361b8ee --- /dev/null +++ b/pkg/delivery/sender_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery_test + +import ( + "reflect" + "sync" + "testing" + + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" +) + +func TestSendEvent(t *testing.T) { + actionInvoked := false + expectedData := map[string]interface{}{ + "hello": "world", + } + expectedContext := &event.Context{ + EventID: "123", + } + actionName := "projects/demo/region/us-central1/function/test" + actionType := "functions.googleapis.com" + callback := func(name string, data interface{}, context *event.Context) (interface{}, error) { + if actionInvoked { + t.Fatal("Action invoked twice") + } + actionInvoked = true + if name != actionName { + t.Fatalf("Action invoked with wrong name. expected=%s got=%s", actionName, name) + } + if !reflect.DeepEqual(expectedData, data) { + t.Fatalf("Action invoked with wrong data. expected=%v got=%v", expectedData, data) + } + if !reflect.DeepEqual(expectedContext, context) { + t.Fatalf("Action invoked with wrong context. expected=%v got=%v", expectedContext, context) + } + + return nil, nil + } + + q := queue.NewInMemoryQueue(1) + sender := delivery.NewSender(q, map[string]action.Action{ + actionType: action.ActionFunc(callback), + }) + + q.Push(queue.QueuedEvent{ + Context: expectedContext, + Data: expectedData, + Action: queue.ActionType{ + Processor: actionType, + Name: actionName, + }, + }) + + if ok := sender.RunOnce(nil); !ok { + t.Fatal("expected RunOnce() to return true") + } + if !actionInvoked { + t.Fatal("Did not invoke action") + } +} + +func TestShutdown(t *testing.T) { + q := queue.NewInMemoryQueue(0) + sender := delivery.NewSender(q, nil) + stopCh := make(chan struct{}) + var wait sync.WaitGroup + done := false + wait.Add(1) + go func() { + sender.Run(20, stopCh) + done = true + wait.Done() + }() + if done { + t.Fatalf("Sender stops automatically") + } + close(stopCh) + wait.Wait() + if !done { + t.Fatalf("Should be done") + } +} From 806c479e4a6c17f9cf7bc4d2d40169f67ebcece8 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 23 Apr 2018 14:28:41 -0700 Subject: [PATCH 03/14] review feedback --- cmd/delivery/BUILD.bazel | 1 - cmd/delivery/main.go | 14 +++++++------- pkg/delivery/action/BUILD.bazel | 1 + pkg/delivery/action/action.go | 4 +++- pkg/delivery/queue/diagnostics.go | 4 ++-- pkg/delivery/queue/memory_queue.go | 12 ++++++------ pkg/delivery/queue/queue.go | 3 ++- pkg/delivery/receiver.go | 22 ++++++++++------------ pkg/delivery/sender.go | 5 +++++ pkg/event/event_test.go | 2 +- 10 files changed, 37 insertions(+), 31 deletions(-) diff --git a/cmd/delivery/BUILD.bazel b/cmd/delivery/BUILD.bazel index 2f9cdf00a76..a9abbb1d42f 100644 --- a/cmd/delivery/BUILD.bazel +++ b/cmd/delivery/BUILD.bazel @@ -30,7 +30,6 @@ go_binary( embed = [":go_default_library"], importpath = "github.com/elafros/eventing/cmd/delivery", pure = "on", - visibility = ["//visibility:public"], ) load("@io_bazel_rules_docker//go:image.bzl", "go_image") diff --git a/cmd/delivery/main.go b/cmd/delivery/main.go index e9f16a9f101..877cf8f7d91 100644 --- a/cmd/delivery/main.go +++ b/cmd/delivery/main.go @@ -1,5 +1,5 @@ /* -Copyright 2017 The Kubernetes Authors. +Copyright 2018 Google, Inc. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Implements a process that wraps an event-receiving webhook, an event +// queue, and a sender which delivers events to their eventual destination. package main import ( @@ -54,6 +56,7 @@ var ( ) func main() { + defer glog.Flush() flag.Parse() // set up signals so we handle the first shutdown signal gracefully @@ -101,13 +104,12 @@ func main() { }() // Set up HTTP endpoints - glog.Infof("Set up metrics scrape path %s", metricsScrapePath) - glog.Infof("Set up debug queue path %s", debugQueuePath) - glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) - mux := http.NewServeMux() + glog.Infof("Set up metrics scrape path %s", metricsScrapePath) mux.Handle(metricsScrapePath, promhttp.Handler()) + glog.Infof("Set up debug queue path %s", debugQueuePath) mux.Handle(debugQueuePath, queue.NewDiagnosticHandler(q)) + glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) mux.Handle(sendEventPrefix, receiver) mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { glog.Infof("Unhandled %s to route %q", req.Method, req.URL.Path) @@ -129,8 +131,6 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() srv.Shutdown(ctx) - - glog.Flush() } func init() { diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel index fb707651fdc..5864071b545 100644 --- a/pkg/delivery/action/BUILD.bazel +++ b/pkg/delivery/action/BUILD.bazel @@ -6,6 +6,7 @@ go_library( name = "go_default_library", srcs = [ "action.go", + "doc.go", "elafros.go", "logging.go", ], diff --git a/pkg/delivery/action/action.go b/pkg/delivery/action/action.go index 177664fd814..f4bce8172ae 100644 --- a/pkg/delivery/action/action.go +++ b/pkg/delivery/action/action.go @@ -27,7 +27,9 @@ import ( type Action interface { // SendEvent delivers an event to the named Action synchronously. // Should return the response from the action (for possible continuation) - // or an error + // or an error. + // Returns an interface{} result which is currently unused, but will be used to + // allow continuations in the future. // TODO: May need interfaces to indicate whether errors are fatal or retryable. SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) } diff --git a/pkg/delivery/queue/diagnostics.go b/pkg/delivery/queue/diagnostics.go index af3b0517ca5..e4f87dd03e2 100644 --- a/pkg/delivery/queue/diagnostics.go +++ b/pkg/delivery/queue/diagnostics.go @@ -29,10 +29,10 @@ type diagnosticsHandler struct { } // NewDiagnosticHandler provides endpoints for inspecting -// and diagnsing queued events. +// and diagnosing queued events. // Eventually we'll need to decide more about how we will shard our queues // and what visibility we'll give to each. E.g. I suspect we'll support looking -// at the lenght of queues for a single Flow or Action. +// at the length of queues for a single Flow or Action. func NewDiagnosticHandler(q Queue) http.Handler { h := &diagnosticsHandler{Queue: q, ServeMux: http.NewServeMux()} h.HandleFunc("/queue/length", h.GetLength) diff --git a/pkg/delivery/queue/memory_queue.go b/pkg/delivery/queue/memory_queue.go index f74f7d1e048..819f45442a0 100644 --- a/pkg/delivery/queue/memory_queue.go +++ b/pkg/delivery/queue/memory_queue.go @@ -22,20 +22,20 @@ import "errors" // Note: this isn't just a simple typedef for a queue because we will soon start // experimenting with other features, such as fetching an event separate from acking, // transactional ack + enqueue, cursors, etc. -type InMemoryQueue struct { +type inMemoryQueue struct { ch chan QueuedEvent } // NewInMemoryQueue creates a new InMemoryQueue -func NewInMemoryQueue(length int) *InMemoryQueue { +func NewInMemoryQueue(length int) Queue { ch := make(chan QueuedEvent, length) - return &InMemoryQueue{ + return &inMemoryQueue{ ch: ch, } } // Push implements Queue.Push -func (q *InMemoryQueue) Push(event QueuedEvent) error { +func (q *inMemoryQueue) Push(event QueuedEvent) error { select { case q.ch <- event: return nil @@ -45,7 +45,7 @@ func (q *InMemoryQueue) Push(event QueuedEvent) error { } // Pull implements Queue.Pull -func (q *InMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { +func (q *inMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { select { case event, ok := <-q.ch: return event, ok @@ -55,6 +55,6 @@ func (q *InMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { } // Length returns Queue.Length -func (q InMemoryQueue) Length() int { +func (q inMemoryQueue) Length() int { return len(q.ch) } diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go index d8db4d1142c..07495e2e6c2 100644 --- a/pkg/delivery/queue/queue.go +++ b/pkg/delivery/queue/queue.go @@ -20,13 +20,14 @@ import ( "github.com/elafros/eventing/pkg/event" ) -// QueuedEvents are saved to an EventQueue before delivery +// QueuedEvent is the element saved to an EventQueue before delivery type QueuedEvent struct { Action ActionType `json:"action"` Data interface{} `json:"data"` Context *event.Context `json:"context"` } +// ActionType is a shim while Bind's Action is not generic. // TODO(vaikas): Remove this once Bind's Action has been migrated // to be generic. type ActionType struct { diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go index f9ad8fd50a0..860189ea0d3 100644 --- a/pkg/delivery/receiver.go +++ b/pkg/delivery/receiver.go @@ -17,7 +17,6 @@ limitations under the License. package delivery import ( - "fmt" "net/http" "regexp" @@ -30,18 +29,19 @@ import ( "k8s.io/apimachinery/pkg/api/errors" ) -// If an event source knows the exact flow it is targeting it can bypass the work involved with -// processing event triggers. +// Currently the delivery service requires that Events are sent directly to an endpoint named +// after a Bind which the Event matches. Future versions may offer a "firehose" endpoint that +// will do late-time filtering of Events. var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) // TODO(vaikas): Remove this once Bind's Action has been migrated // to be generic. -const alwaysUseProcessor = "eventing.elafros.dev/EventLogger" +const hardCodedProcessor = "eventing.elafros.dev/EventLogger" func actionFromBind(bind *v1alpha1.Bind) queue.ActionType { return queue.ActionType{ Name: bind.Spec.Action.RouteName, - Processor: alwaysUseProcessor, + Processor: hardCodedProcessor, } } @@ -52,7 +52,7 @@ type Receiver struct { bindsLister listers.BindLister } -// NewReceiver creates a new Reciever object to enqueue events. +// NewReceiver creates a new Receiver object to enqueue events. func NewReceiver(bindsLister listers.BindLister, eventQueue queue.Queue) *Receiver { return &Receiver{bindsLister: bindsLister, eventQueue: eventQueue} } @@ -81,15 +81,15 @@ func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - bind, err := r.bindsLister.Binds(string(matches[1])).Get(string(matches[2])) + namespace, flowName := string(matches[1]), string(matches[2]) + bind, err := r.bindsLister.Binds(namespace).Get(flowName) if err != nil { if errors.IsNotFound(err) { - fmt.Printf("Could not find Bind %s in namespace %s\n", matches[2], matches[1]) - glog.V(3).Infof("Event sent to non-existant Bind %s in namespace %s", matches[2], matches[1]) + glog.V(3).Infof("Event sent to non-existant Bind %s in namespace %s", flowName, namespace) w.WriteHeader(http.StatusNotFound) return } - glog.Errorf("Unknown error fetching bind %s in namespace %s: %s", matches[2], matches[1], err) + glog.Errorf("Unknown error fetching bind %s in namespace %s: %s", flowName, namespace, err) w.WriteHeader(http.StatusInternalServerError) return } @@ -107,6 +107,4 @@ func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - - w.WriteHeader(http.StatusOK) } diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go index b4c330e4e7d..77bfdb9f30b 100644 --- a/pkg/delivery/sender.go +++ b/pkg/delivery/sender.go @@ -79,5 +79,10 @@ func (s *Sender) Run(threadiness int, stopCh <-chan struct{}) error { for i := 0; i < threadiness; i++ { go wait.Until(func() { s.RunOnce(stopCh) }, time.Second, stopCh) } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + return nil } diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index b6b7e79127b..9ce07cbf9ce 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -27,7 +27,7 @@ import ( var ( context = &event.Context{ CloudEventsVersion: "0.0.1-preview-1", - EventId: "eventid-123", + EventID: "eventid-123", EventType: "google.firestore.document.create", EventTime: time.Now().UTC(), Source: "//firestore.googleapis.com/projects/demo/databases/default/documents/users/inlined", From 2d2690c9f8e45ea755d6321113e8fa7a57b00441 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 23 Apr 2018 14:32:08 -0700 Subject: [PATCH 04/14] Automatic changes by build tools --- cmd/delivery/BUILD.bazel | 2 -- pkg/delivery/BUILD.bazel | 4 +++- pkg/delivery/action/BUILD.bazel | 2 +- pkg/delivery/queue/BUILD.bazel | 4 +--- pkg/event/BUILD.bazel | 5 +++-- pkg/sources/BUILD | 2 +- 6 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cmd/delivery/BUILD.bazel b/cmd/delivery/BUILD.bazel index a9abbb1d42f..a8510e80ce9 100644 --- a/cmd/delivery/BUILD.bazel +++ b/cmd/delivery/BUILD.bazel @@ -10,8 +10,6 @@ go_library( deps = [ "//pkg/client/clientset/versioned:go_default_library", "//pkg/client/informers/externalversions:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/controller/bind:go_default_library", "//pkg/delivery:go_default_library", "//pkg/delivery/action:go_default_library", "//pkg/delivery/queue:go_default_library", diff --git a/pkg/delivery/BUILD.bazel b/pkg/delivery/BUILD.bazel index b4f9e2c04e0..4085df3259f 100644 --- a/pkg/delivery/BUILD.bazel +++ b/pkg/delivery/BUILD.bazel @@ -5,6 +5,7 @@ load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "doc.go", "receiver.go", "sender.go", ], @@ -23,11 +24,12 @@ go_library( ) go_test( - name = "go_default_test", + name = "go_default_xtest", srcs = [ "receiver_test.go", "sender_test.go", ], + importpath = "github.com/elafros/eventing/pkg/delivery_test", deps = [ ":go_default_library", "//pkg/apis/bind/v1alpha1:go_default_library", diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel index 5864071b545..0cde53d03de 100644 --- a/pkg/delivery/action/BUILD.bazel +++ b/pkg/delivery/action/BUILD.bazel @@ -1,6 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel index 1336348d5e6..df8567bec89 100644 --- a/pkg/delivery/queue/BUILD.bazel +++ b/pkg/delivery/queue/BUILD.bazel @@ -1,6 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", @@ -13,7 +13,5 @@ go_library( deps = [ "//pkg/event:go_default_library", "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/event/BUILD.bazel b/pkg/event/BUILD.bazel index cbfddebb2a0..2848ed19047 100644 --- a/pkg/event/BUILD.bazel +++ b/pkg/event/BUILD.bazel @@ -8,7 +8,8 @@ go_library( ) go_test( - name = "go_default_test", + name = "go_default_xtest", srcs = ["event_test.go"], + importpath = "github.com/elafros/eventing/pkg/event_test", deps = [":go_default_library"], - ) +) diff --git a/pkg/sources/BUILD b/pkg/sources/BUILD index 76e3a29f124..02922d4c903 100644 --- a/pkg/sources/BUILD +++ b/pkg/sources/BUILD @@ -3,9 +3,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = [ - "github.go", "event_source.go", "event_trigger.go", + "github.go", ], importpath = "github.com/elafros/eventing/pkg/sources", visibility = ["//visibility:public"], From d940e90821a3e5dc7440d72788eb7ea9876ccbe0 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 23 Apr 2018 16:33:55 -0700 Subject: [PATCH 05/14] Whoops. Forgot doc.gos --- pkg/delivery/action/doc.go | 19 +++++++++++++++++++ pkg/delivery/doc.go | 22 ++++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 pkg/delivery/action/doc.go create mode 100644 pkg/delivery/doc.go diff --git a/pkg/delivery/action/doc.go b/pkg/delivery/action/doc.go new file mode 100644 index 00000000000..cc1b59caf42 --- /dev/null +++ b/pkg/delivery/action/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package action implements the various Actions that can be invoked +// in response to an Event. +package action diff --git a/pkg/delivery/doc.go b/pkg/delivery/doc.go new file mode 100644 index 00000000000..65f5ad64b5e --- /dev/null +++ b/pkg/delivery/doc.go @@ -0,0 +1,22 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package delivery implements an event delivery service. The Receiver +// interface exposes a WebHook where compatible Events can be sent. +// The Receiver enqueues the Event and then acknowledges the webhook. +// A Sender polls the events Queue and delivers the event to the +// appropriate Action according to the Bind that matched the Event. +package delivery From 4387a135f779dfb3c1bf2c76dbbcda1d285c2ab9 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Wed, 25 Apr 2018 09:19:03 -0700 Subject: [PATCH 06/14] PR feedback; change interface in Sender --- pkg/delivery/queue/queue.go | 3 +++ pkg/delivery/sender.go | 32 ++++++++++++++++++++++---------- pkg/delivery/sender_test.go | 4 ++-- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go index 07495e2e6c2..286bd5b7826 100644 --- a/pkg/delivery/queue/queue.go +++ b/pkg/delivery/queue/queue.go @@ -21,6 +21,9 @@ import ( ) // QueuedEvent is the element saved to an EventQueue before delivery +// TODO(inlined): This should probably be made package protected; +// It may be reasonable to just return a tuple of these values from +// the Queue pull operations instead of defining the struct. type QueuedEvent struct { Action ActionType `json:"action"` Data interface{} `json:"data"` diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go index 77bfdb9f30b..1487767b22c 100644 --- a/pkg/delivery/sender.go +++ b/pkg/delivery/sender.go @@ -18,6 +18,7 @@ package delivery import ( "fmt" + "io" "time" "github.com/golang/glog" @@ -46,38 +47,49 @@ func NewSender( } // RunOnce processes a single event from the queue. +// Errors are unactionable and reported to the runtime rather than returned. +// Returns true if more processing is possible. // TODO: If the Queue was redesigned so that events are pulled with a lease, then we could restructure RunOnce to not // need the stopCh (though does it really matter now anyway if the in-memory queue goes away?) -func (s *Sender) RunOnce(stopCh <-chan struct{}) bool { +func (s *Sender) RunOnce(stopCh <-chan struct{}) error { // TODO(inlined): Pull should only take a lease on the queued event. The receiver should need // to acknowledge the event and possibly transactionally enqueue subsequent events. event, ok := s.eventQueue.Pull(stopCh) if !ok { - glog.V(4).Info("RunOnce shutting down") - return false + return io.EOF } glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) action, ok := s.actions[event.Action.Processor] if !ok { - runtime.HandleError(fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor)) - return true + return fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor) } // TODO(inlined): retry strategies for errors and continuations for success. res, err := action.SendEvent(event.Action.Name, event.Data, event.Context) if err != nil { - runtime.HandleError(fmt.Errorf("Action for event %s returned error: %s", event.Context.EventID, err)) - } else { - glog.V(4).Info("Event %s handled with response %+v", event.Context.EventID, res) + return fmt.Errorf("Action for event %s returned error: %s", event.Context.EventID, err) + } + glog.V(4).Info("Event %s handled with response %+v", event.Context.EventID, res) + return nil +} + +func (s *Sender) run(stopCh <-chan struct{}) { + for { + err := s.RunOnce(stopCh) + if err != nil { + if err == io.EOF { + return + } + runtime.HandleError(err) + } } - return true } // Run runs Sender until stopCh is closed. func (s *Sender) Run(threadiness int, stopCh <-chan struct{}) error { for i := 0; i < threadiness; i++ { - go wait.Until(func() { s.RunOnce(stopCh) }, time.Second, stopCh) + go wait.Until(func() { s.run(stopCh) }, time.Second, stopCh) } glog.Info("Started workers") diff --git a/pkg/delivery/sender_test.go b/pkg/delivery/sender_test.go index f0ad361b8ee..73b9505bcff 100644 --- a/pkg/delivery/sender_test.go +++ b/pkg/delivery/sender_test.go @@ -69,8 +69,8 @@ func TestSendEvent(t *testing.T) { }, }) - if ok := sender.RunOnce(nil); !ok { - t.Fatal("expected RunOnce() to return true") + if err := sender.RunOnce(nil); err != nil { + t.Fatalf("RunOnce() failed with err: %s", err) } if !actionInvoked { t.Fatal("Did not invoke action") From 5dce9fdb6ead9880985d8feb3eedfee7d62c0afa Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Tue, 17 Apr 2018 10:45:34 -0700 Subject: [PATCH 07/14] Change Bind.Spec.Action and regenerate --- pkg/apis/bind/v1alpha1/bind_types.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/apis/bind/v1alpha1/bind_types.go b/pkg/apis/bind/v1alpha1/bind_types.go index 6267e02d954..cfe9dee22b6 100644 --- a/pkg/apis/bind/v1alpha1/bind_types.go +++ b/pkg/apis/bind/v1alpha1/bind_types.go @@ -35,9 +35,15 @@ type Bind struct { Status BindStatus `json:"status"` } +// BindAction describes where events should be delivered. type BindAction struct { - // RouteName specifies Elafros route as a target. - RouteName string `json:"routeName,omitempty"` + // Processor dictates the kind of service that will handle the Event. + // For example "elafros.dev/Route" + Processor string `json:"processor"` + + // Name dicates the resource exposed by Processor that will handle the event. + // The semantics of Name is determined by Processor. + Name string `json:"name"` } // EventTrigger describes when an Event should be delivered. From d6d7aecd4029c8310e5126cbda5bed46712c1a8a Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Tue, 17 Apr 2018 11:05:30 -0700 Subject: [PATCH 08/14] Remove Action shim --- pkg/delivery/queue/BUILD.bazel | 1 + pkg/delivery/queue/queue.go | 15 ++++----------- pkg/delivery/receiver.go | 7 +++++-- pkg/delivery/receiver_test.go | 6 +++++- pkg/delivery/sender_test.go | 3 ++- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel index df8567bec89..b095c781832 100644 --- a/pkg/delivery/queue/BUILD.bazel +++ b/pkg/delivery/queue/BUILD.bazel @@ -12,6 +12,7 @@ go_library( importpath = "github.com/elafros/eventing/pkg/delivery/queue", deps = [ "//pkg/event:go_default_library", + "//pkg/apis/bind/v1alpha1:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go index 286bd5b7826..e91e3bb72c8 100644 --- a/pkg/delivery/queue/queue.go +++ b/pkg/delivery/queue/queue.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" "github.com/elafros/eventing/pkg/event" ) @@ -25,17 +26,9 @@ import ( // It may be reasonable to just return a tuple of these values from // the Queue pull operations instead of defining the struct. type QueuedEvent struct { - Action ActionType `json:"action"` - Data interface{} `json:"data"` - Context *event.Context `json:"context"` -} - -// ActionType is a shim while Bind's Action is not generic. -// TODO(vaikas): Remove this once Bind's Action has been migrated -// to be generic. -type ActionType struct { - Name string `json:"name"` - Processor string `json:"processor"` + Action v1alpha1.BindAction `json:"action"` + Data interface{} `json:"data"` + Context *event.Context `json:"context"` } // Queue implements basic features to allow asynchronous buffering of events. diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go index 860189ea0d3..545176e5f5b 100644 --- a/pkg/delivery/receiver.go +++ b/pkg/delivery/receiver.go @@ -34,6 +34,7 @@ import ( // will do late-time filtering of Events. var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) +<<<<<<< HEAD // TODO(vaikas): Remove this once Bind's Action has been migrated // to be generic. const hardCodedProcessor = "eventing.elafros.dev/EventLogger" @@ -45,6 +46,8 @@ func actionFromBind(bind *v1alpha1.Bind) queue.ActionType { } } +======= +>>>>>>> Remove Action shim // Receiver manages the HTTP endpoints for receiving events as well as // stats about the queue for processing events. type Receiver struct { @@ -58,7 +61,7 @@ func NewReceiver(bindsLister listers.BindLister, eventQueue queue.Queue) *Receiv } // SendEvent enqueues an event data and Context for delivery to a particular action. -func (r *Receiver) SendEvent(action queue.ActionType, data interface{}, context *event.Context) error { +func (r *Receiver) SendEvent(action v1alpha1.BindAction, data interface{}, context *event.Context) error { return r.eventQueue.Push(queue.QueuedEvent{ Action: action, Data: data, @@ -102,7 +105,7 @@ func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - if err := r.SendEvent(actionFromBind(bind), data, context); err != nil { + if err := r.SendEvent(bind.Spec.Action, data, context); err != nil { glog.Error("Failed to enqueue event", err) w.WriteHeader(http.StatusInternalServerError) return diff --git a/pkg/delivery/receiver_test.go b/pkg/delivery/receiver_test.go index c8280ca2f7b..c22977a20ec 100644 --- a/pkg/delivery/receiver_test.go +++ b/pkg/delivery/receiver_test.go @@ -51,7 +51,8 @@ var ( Resource: "{abc}/123", }, Action: v1alpha1.BindAction{ - RouteName: "vaikas.fix.this", + Processor: "eventing.elafros.dev/WebHook", + Name: "https://demo.google.com/notreal", }, }, } @@ -163,4 +164,7 @@ func TestEnqueueEvent(t *testing.T) { if !reflect.DeepEqual(data, event.Data) { t.Fatalf("Event data was not marshalled correctly; expected=%+v got=%+v", data, event.Data) } + if !reflect.DeepEqual(demoBind.Spec.Action, event.Action) { + t.Fatalf("Event action was not marshalled correctly; expected=%+v got=%+v", demoBind.Spec.Action, event.Action) + } } diff --git a/pkg/delivery/sender_test.go b/pkg/delivery/sender_test.go index 73b9505bcff..91980386318 100644 --- a/pkg/delivery/sender_test.go +++ b/pkg/delivery/sender_test.go @@ -21,6 +21,7 @@ import ( "sync" "testing" + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" "github.com/elafros/eventing/pkg/delivery" "github.com/elafros/eventing/pkg/delivery/action" "github.com/elafros/eventing/pkg/delivery/queue" @@ -63,7 +64,7 @@ func TestSendEvent(t *testing.T) { q.Push(queue.QueuedEvent{ Context: expectedContext, Data: expectedData, - Action: queue.ActionType{ + Action: v1alpha1.BindAction{ Processor: actionType, Name: actionName, }, From 71ffd96749d98b7f394186c03f4554ef0d813b0b Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Tue, 17 Apr 2018 13:44:52 -0700 Subject: [PATCH 09/14] Refactor GH code till sample works again. Also refactored some of the Bind controller so that we can cleanly handle Binds without an event trigger. Binds without triggers aren't a great golden road, but will help us experiment with new event providers (e.g. those demoed at KubeCon) with zero infra changes. --- pkg/apis/bind/v1alpha1/bind_types.go | 11 +- pkg/controller/bind/BUILD | 1 + pkg/controller/bind/controller.go | 167 +++++++++++++++------------ pkg/delivery/sender.go | 8 +- pkg/sources/BUILD | 5 +- pkg/sources/event_source.go | 6 +- pkg/sources/event_trigger.go | 17 --- pkg/sources/github.go | 70 +++++++++-- sample/github/pullrequest.yaml | 3 +- 9 files changed, 178 insertions(+), 110 deletions(-) delete mode 100644 pkg/sources/event_trigger.go diff --git a/pkg/apis/bind/v1alpha1/bind_types.go b/pkg/apis/bind/v1alpha1/bind_types.go index cfe9dee22b6..704837e89ab 100644 --- a/pkg/apis/bind/v1alpha1/bind_types.go +++ b/pkg/apis/bind/v1alpha1/bind_types.go @@ -125,13 +125,22 @@ type EventTrigger struct { ParametersFrom []ParametersFromSource `json:"parametersFrom,omitempty"` } +// Zero determines whether an EventTrigger is fully empty. If a Bind is set up +// without an EventTrigger, it is assumed that the real event source is incompatible +// with our event framework's control plane and will be set up manually through +// side channels. +func (t *EventTrigger) Zero() bool { + return t.EventType == "" && t.Resource == "" && t.Service == "" && + t.Parameters == nil && t.ParametersFrom == nil +} + // BindSpec is the spec for a Bind resource type BindSpec struct { // Action specifies the target handler for the events Action BindAction `json:"action"` // Trigger specifies the trigger we're binding to - Trigger EventTrigger `json:trigger"` + Trigger EventTrigger `json:"trigger"` } // ParametersFromSource represents the source of a set of Parameters diff --git a/pkg/controller/bind/BUILD b/pkg/controller/bind/BUILD index a128b5dba39..b2b51a1d341 100644 --- a/pkg/controller/bind/BUILD +++ b/pkg/controller/bind/BUILD @@ -12,6 +12,7 @@ go_library( "//pkg/client/informers/externalversions:go_default_library", "//pkg/client/listers/bind/v1alpha1:go_default_library", "//pkg/controller:go_default_library", + "//pkg/delivery/action:go_default_library", "//pkg/sources:go_default_library", "//vendor/github.com/elafros/elafros/pkg/client/informers/externalversions:go_default_library", "//vendor/github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1:go_default_library", diff --git a/pkg/controller/bind/controller.go b/pkg/controller/bind/controller.go index 50aa1ce223e..44184c7cd40 100644 --- a/pkg/controller/bind/controller.go +++ b/pkg/controller/bind/controller.go @@ -48,6 +48,7 @@ import ( bindscheme "github.com/elafros/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/elafros/eventing/pkg/client/informers/externalversions" listers "github.com/elafros/eventing/pkg/client/listers/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/action" ) const controllerAgentName = "bind-controller" @@ -133,7 +134,7 @@ func NewController( } controller.sources = make(map[string]sources.EventSource) - controller.sources["github.com"] = sources.NewGithubEventSource() + controller.sources["github.com"] = sources.NewGithubEventSource(kubeclientset) glog.Info("Setting up event handlers") // Set up an event handler for when Bind resources change bindInformer.Binds().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -288,76 +289,40 @@ func (c *Controller) syncHandler(key string) error { bind = bind.DeepCopy() // Find the Route that they want. - routeName := bind.Spec.Action.RouteName - route, err := c.routesLister.Routes(namespace).Get(routeName) - if err != nil { + actionName := bind.Spec.Action.Name + actionType := bind.Spec.Action.Processor + if err := c.validateAction(namespace, actionName, actionType); err != nil { if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Route %q in namespace %q does not exist", routeName, namespace)) + runtime.HandleError(fmt.Errorf("Action %q of type %q in namespace %q does not exist", actionName, actionType, namespace)) } + glog.Warningf("Bind %q rejected for invalid action: %s", name, err) return err } - domainSuffix, err := getDomainSuffixFromElaConfig(c.kubeclientset) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Can't find ela ConfigMap")) + var condition *v1alpha1.BindCondition + if bind.Spec.Trigger.Zero() { + glog.Warningf("Bind %q has no Spec.Trigger", name) + if bind.Status.Conditions != nil { + condition = &v1alpha1.BindCondition{ + Type: v1alpha1.BindComplete, + Status: corev1.ConditionTrue, + Reason: fmt.Sprintf("BindSuccess; no Trigger to manage"), + Message: "Bind successful", + } } - return err - } - - functionDNS := fmt.Sprintf("%s.%s.%s", route.Name, route.Namespace, domainSuffix) - glog.Infof("Found route DNS as '%q'", functionDNS) - - es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } else { + if condition, err = c.bindTrigger(namespace, bind); err != nil { + return err } - return err } - et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) - } - return err - } - - trigger, err := resolveTrigger(c.kubeclientset, namespace, bind.Spec.Trigger) - if err != nil { - glog.Warningf("Failed to process parameters: %s", err) - return err - } - - if bind.Status.Conditions != nil { + if condition == nil { glog.Infof("Already has status, skipping") return nil } - glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger) - if val, ok := c.sources[es.Name]; ok { - r, err := val.Bind(trigger, functionDNS) - if err != nil { - glog.Warningf("BIND failed: %s", err) - msg := fmt.Sprintf("Bind failed with : %s", r) - bind.Status.SetCondition(&v1alpha1.BindCondition{ - Type: v1alpha1.BindFailed, - Status: corev1.ConditionTrue, - Reason: "BindFailed", - Message: msg, - }) - } else { - bind.Status.SetCondition(&v1alpha1.BindCondition{ - Type: v1alpha1.BindComplete, - Status: corev1.ConditionTrue, - Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)), - Message: "Bind successful", - }) - } - } - _, err = c.updateStatus(bind) - if err != nil { + bind.Status.SetCondition(condition) + if _, err = c.updateStatus(bind); err != nil { glog.Warningf("Failed to update status: %s", err) return err } @@ -381,19 +346,69 @@ func (c *Controller) updateStatus(u *v1alpha1.Bind) (*v1alpha1.Bind, error) { return bindClient.Update(newu) } -func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (sources.EventTrigger, error) { - r := sources.EventTrigger{ - Resource: trigger.Resource, - EventType: trigger.EventType, - Parameters: make(map[string]interface{}), +func (c *Controller) bindTrigger(namespace string, bind *v1alpha1.Bind) (*v1alpha1.BindCondition, error) { + es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) + if err != nil { + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } + return nil, err + } + + et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType) + if err != nil { + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } + return nil, err + } + + parameters, err := resolveParameters(c.kubeclientset, namespace, bind.Spec.Trigger) + if err != nil { + glog.Warningf("Failed to process parameters: %s", err) + return nil, err + } + + if bind.Status.Conditions != nil { + return nil, nil + } + + glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, bind.Spec.Trigger) + eventSource, ok := c.sources[es.Name] + if !ok { + return nil, fmt.Errorf("Do not know how to deploy EventTrigger to source with name %s", es.Name) + } + + r, err := eventSource.Bind(bind, parameters) + if err != nil { + glog.Warningf("BIND failed: %s", err) + msg := fmt.Sprintf("Bind failed with : %s", r) + return &v1alpha1.BindCondition{ + Type: v1alpha1.BindFailed, + Status: corev1.ConditionTrue, + Reason: "BindFailed", + Message: msg, + }, nil } + return &v1alpha1.BindCondition{ + Type: v1alpha1.BindComplete, + Status: corev1.ConditionTrue, + // BUG(43) ID is a feature of the GitHub event source. If it is to become standardized, + // we should not be using a map[string]interface{} result. + Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)), + Message: "Bind successful", + }, nil +} + +func resolveParameters(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (map[string]interface{}, error) { + r := make(map[string]interface{}) if trigger.Parameters != nil && trigger.Parameters.Raw != nil && len(trigger.Parameters.Raw) > 0 { p := make(map[string]interface{}) if err := yaml.Unmarshal(trigger.Parameters.Raw, &p); err != nil { return r, err } for k, v := range p { - r.Parameters[k] = v + r[k] = v } } if trigger.ParametersFrom != nil { @@ -404,7 +419,7 @@ func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v return r, err } for k, v := range pfs { - r.Parameters[k] = v + r[k] = v } } } @@ -446,16 +461,16 @@ func unmarshalJSON(in []byte) (map[string]interface{}, error) { return parameters, nil } -func getDomainSuffixFromElaConfig(cl kubernetes.Interface) (string, error) { - const name = "ela-config" - const namespace = "ela-system" - c, err := cl.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return "", err - } - domainSuffix, ok := c.Data["domainSuffix"] - if !ok { - return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) +func (c *Controller) validateAction(namespace string, name string, actionType string) error { + switch actionType { + case action.ElafrosActionType: + _, err := c.routesLister.Routes(namespace).Get(name) + return err + case action.LoggingActionType: + return nil + default: + // Should this be an errors.NewNotFound? How does the action struct + // fit into the schema.GroupResource idea? + return fmt.Errorf("Unknown Action.Processor %q", actionType) } - return domainSuffix, nil } diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go index 1487767b22c..3aa906f4ab5 100644 --- a/pkg/delivery/sender.go +++ b/pkg/delivery/sender.go @@ -62,7 +62,13 @@ func (s *Sender) RunOnce(stopCh <-chan struct{}) error { glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) action, ok := s.actions[event.Action.Processor] if !ok { - return fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor) + names := make([]string, 0, len(s.actions)) + for name := range s.actions { + names = append(names, name) + } + runtime.HandleError(fmt.Errorf("Event %q is routed to unknown Processor %q. Valid names are %q", + event.Context.EventID, event.Action.Processor, names)) + return true } // TODO(inlined): retry strategies for errors and continuations for success. diff --git a/pkg/sources/BUILD b/pkg/sources/BUILD index 02922d4c903..bf00542ba60 100644 --- a/pkg/sources/BUILD +++ b/pkg/sources/BUILD @@ -4,14 +4,17 @@ go_library( name = "go_default_library", srcs = [ "event_source.go", - "event_trigger.go", "github.go", ], importpath = "github.com/elafros/eventing/pkg/sources", visibility = ["//visibility:public"], deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/delivery/action:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/google/go-github/github:go_default_library", "//vendor/golang.org/x/oauth2:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/sources/event_source.go b/pkg/sources/event_source.go index c807653c54a..b661a6fd8f4 100644 --- a/pkg/sources/event_source.go +++ b/pkg/sources/event_source.go @@ -16,6 +16,10 @@ limitations under the License. package sources +import "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + +// EventSource manages the subscription of which events a developer is interested in. type EventSource interface { - Bind(trigger EventTrigger, route string) (map[string]interface{}, error) + // Bind creates a new subscription of events. + Bind(bind *v1alpha1.Bind, parameters map[string]interface{}) (map[string]interface{}, error) } diff --git a/pkg/sources/event_trigger.go b/pkg/sources/event_trigger.go deleted file mode 100644 index a36ff723607..00000000000 --- a/pkg/sources/event_trigger.go +++ /dev/null @@ -1,17 +0,0 @@ -package sources - -// EventTrigger is a version of v1alpha1.EventTrigger where -// parameters have been resolved. -type EventTrigger struct { - // EventType is the type of event to be observed - EventType string `json:"eventType"` - - // Resource is the resource(s) from which to observe events. - Resource string `json:"resource"` - - // Service is the hostname of the service that should be observed. - Service string `json:"service"` - - // Parameters is an opaque map of configuration needed by the EventSource. - Parameters map[string]interface{} `json:"params"` -} diff --git a/pkg/sources/github.go b/pkg/sources/github.go index a6b100a0d1c..6431f217215 100644 --- a/pkg/sources/github.go +++ b/pkg/sources/github.go @@ -18,14 +18,18 @@ package sources import ( "context" + "errors" "fmt" "strings" "github.com/golang/glog" - + ghclient "github.com/google/go-github/github" "golang.org/x/oauth2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" - ghclient "github.com/google/go-github/github" + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/action" ) const ( @@ -37,39 +41,52 @@ const ( MessageResourceSynced = "Bind synced successfully" ) +// GithubEventSource configures Event delivery from GitHub. type GithubEventSource struct { + clientset kubernetes.Interface } -func NewGithubEventSource() EventSource { - return &GithubEventSource{} +// NewGithubEventSource configures a new GithubEventSource. +func NewGithubEventSource(clientset kubernetes.Interface) EventSource { + return &GithubEventSource{clientset: clientset} } -func (t *GithubEventSource) Bind(trigger EventTrigger, route string) (map[string]interface{}, error) { - glog.Infof("CREATING GITHUB WEBHOOK") +// Bind implements EventSource.Bind +func (s *GithubEventSource) Bind(bind *v1alpha1.Bind, parameters map[string]interface{}) (map[string]interface{}, error) { + glog.Infof("BINDING GITHUB EVENT") + // BUG(#43) The GitHub event source should be responsible for its own gateway rathan relying + // on elafros routes to be externally visible. + if bind.Spec.Action.Processor != action.ElafrosActionType { + return nil, errors.New("[elafros/eventing#43] GitHub event source currently only supports Elafros Routes as their action type") + } + address, err := s.getFunctionAddress(bind) + if err != nil { + return nil, err + } ctx := context.Background() ts := oauth2.StaticTokenSource( - &oauth2.Token{AccessToken: trigger.Parameters["accessToken"].(string)}, + &oauth2.Token{AccessToken: parameters["accessToken"].(string)}, ) tc := oauth2.NewClient(ctx, ts) - glog.Infof("CREATING GITHUB WEBHOOK with access token: %s", trigger.Parameters["accessToken"].(string)) + glog.Infof("CREATING GITHUB WEBHOOK with access token: %s", parameters["accessToken"].(string)) client := ghclient.NewClient(tc) active := true name := "web" config := make(map[string]interface{}) - config["url"] = fmt.Sprintf("http://%s", route) + config["url"] = fmt.Sprintf("http://%s", address) config["content_type"] = "json" - config["secret"] = trigger.Parameters["secretToken"].(string) + config["secret"] = parameters["secretToken"].(string) hook := ghclient.Hook{ Name: &name, - URL: &route, + URL: &address, Events: []string{"pull_request"}, Active: &active, Config: config, } - components := strings.Split(trigger.Resource, "/") + components := strings.Split(bind.Spec.Trigger.Resource, "/") h, r, err := client.Repositories.CreateHook(ctx, components[0] /* owner */, components[1] /* repo */, &hook) if err != nil { glog.Warningf("Failed to create the webhook: %s", err) @@ -81,3 +98,32 @@ func (t *GithubEventSource) Bind(trigger EventTrigger, route string) (map[string ret["ID"] = fmt.Sprintf("%d", h.ID) return ret, nil } + +func (s *GithubEventSource) getFunctionAddress(bind *v1alpha1.Bind) (string, error) { + suffix, err := s.getDomainSuffix() + if err != nil { + return "", err + } + + functionDNS := fmt.Sprintf( + "%s.%s.%s", + /* function name */ bind.Spec.Action.Name, + /* function namespace */ bind.ObjectMeta.Namespace, + /* cluster root domain */ suffix) + glog.Infof("Found route DNS as '%q'", functionDNS) + return functionDNS, nil +} + +func (s *GithubEventSource) getDomainSuffix() (string, error) { + const name = "ela-config" + const namespace = "ela-system" + c, err := s.clientset.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + domainSuffix, ok := c.Data["domainSuffix"] + if !ok { + return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) + } + return domainSuffix, nil +} diff --git a/sample/github/pullrequest.yaml b/sample/github/pullrequest.yaml index e5cb476b702..0caa4f230aa 100644 --- a/sample/github/pullrequest.yaml +++ b/sample/github/pullrequest.yaml @@ -30,4 +30,5 @@ spec: name: githubsecret key: githubCredentials action: - routeName: git-webhook + processor: elafros.dev/Route + name: git-webhook From 5dd2f7afc521c2c44af5515a1da3aae1b1ef6fbe Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Tue, 17 Apr 2018 13:47:12 -0700 Subject: [PATCH 10/14] BUILD.bazel fixup --- pkg/delivery/queue/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel index b095c781832..a47d3cf440b 100644 --- a/pkg/delivery/queue/BUILD.bazel +++ b/pkg/delivery/queue/BUILD.bazel @@ -11,8 +11,8 @@ go_library( ], importpath = "github.com/elafros/eventing/pkg/delivery/queue", deps = [ - "//pkg/event:go_default_library", "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/event:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) From 04c59dbce79298e050625bd6be9397f60c919b71 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 23 Apr 2018 13:40:47 -0700 Subject: [PATCH 11/14] checkpoint --- pkg/controller/bind/controller.go | 5 +++ pkg/delivery/action/elafros.go | 2 ++ sample/actions/BUILD.bazel | 52 +++++++++++++++++++++++++++++++ sample/actions/action.go | 51 ++++++++++++++++++++++++++++++ sample/actions/configuration.yaml | 13 ++++++++ sample/actions/route.yaml | 23 ++++++++++++++ 6 files changed, 146 insertions(+) create mode 100644 sample/actions/BUILD.bazel create mode 100644 sample/actions/action.go create mode 100644 sample/actions/configuration.yaml create mode 100644 sample/actions/route.yaml diff --git a/pkg/controller/bind/controller.go b/pkg/controller/bind/controller.go index 44184c7cd40..d4c72dfd7de 100644 --- a/pkg/controller/bind/controller.go +++ b/pkg/controller/bind/controller.go @@ -19,6 +19,7 @@ package bind import ( "encoding/json" "fmt" + "strings" "time" "github.com/ghodss/yaml" @@ -464,6 +465,10 @@ func unmarshalJSON(in []byte) (map[string]interface{}, error) { func (c *Controller) validateAction(namespace string, name string, actionType string) error { switch actionType { case action.ElafrosActionType: + parts := strings.Split(name, "/") + if len(parts) == 2 { + namespace, name = parts[0], parts[1] + } _, err := c.routesLister.Routes(namespace).Get(name) return err case action.LoggingActionType: diff --git a/pkg/delivery/action/elafros.go b/pkg/delivery/action/elafros.go index 5224759f375..64ae966107c 100644 --- a/pkg/delivery/action/elafros.go +++ b/pkg/delivery/action/elafros.go @@ -31,6 +31,7 @@ func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Clien // SendEvent will POST the event spec to the root URI of the elafros route. func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + glog.V(4).Info("Sending event", context.EventID, "to ELA route") parts := strings.Split(name, "/") if len(parts) != 2 { return nil, fmt.Errorf("Expected elafros route '%s' to be in the form '/'", name) @@ -44,6 +45,7 @@ func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event. } addr := fmt.Sprintf("http://%s.%s.%s", route, namespace, domain) + glog.V(4).Info("Sending event", context.EventID, "to ELA route at", addr) req, err := event.NewRequest(addr, data, *context) if err != nil { return nil, err diff --git a/sample/actions/BUILD.bazel b/sample/actions/BUILD.bazel new file mode 100644 index 00000000000..0089f3e3e69 --- /dev/null +++ b/sample/actions/BUILD.bazel @@ -0,0 +1,52 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["action.go"], + importpath = "github.com/elafros/eventing/sample/actions", + visibility = ["//visibility:private"], + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) + +go_binary( + name = "action-demo", + embed = [":go_default_library"], + importpath = "github.com/elafros/elafros/sample/actions", + pure = "on", + visibility = ["//visibility:public"], +) + +load("@io_bazel_rules_docker//go:image.bzl", "go_image") + +go_image( + name = "image", + binary = ":action-demo", +) + +load("@k8s_object//:defaults.bzl", "k8s_object") + +k8s_object( + name = "configuration", + images = { + "action-demo:latest": ":image", + }, + template = ":configuration.yaml", +) + +k8s_object( + name = "route", + template = ":route.yaml", +) + +load("@io_bazel_rules_k8s//k8s:objects.bzl", "k8s_objects") + +k8s_objects( + name = "everything", + objects = [ + ":route", + ":configuration", + ], +) \ No newline at end of file diff --git a/sample/actions/action.go b/sample/actions/action.go new file mode 100644 index 00000000000..1e4b0b22913 --- /dev/null +++ b/sample/actions/action.go @@ -0,0 +1,51 @@ +package main + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "net/url" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + databaseURL = "https://inlined-junkdrawer.firebaseio.com/" + address = ":8080" +) + +// SendEventToDB... +func SendEventToDB(w http.ResponseWriter, r *http.Request) { + var data map[string]interface{} + context, err := event.FromRequest(&data, r) + if err != nil { + glog.Errorf("Failed to parse event: %s", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + url, err := url.Parse(databaseURL + "seenEvents/" + context.EventID) + if err != nil { + glog.Errorf("Failed to parse url %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + // Can not fail + body, _ := json.Marshal(data) + + write := &http.Request{ + Method: http.MethodPut, + URL: url, + Body: ioutil.NopCloser(bytes.NewReader(body)), + } + if _, err := http.DefaultClient.Do(write); err != nil { + glog.Errorf("Failed to write to RTDB: %s", err) + } + w.WriteHeader(http.StatusOK) +} + +func main() { + http.ListenAndServe(address, http.HandlerFunc(SendEventToDB)) +} diff --git a/sample/actions/configuration.yaml b/sample/actions/configuration.yaml new file mode 100644 index 00000000000..b34ce3726fd --- /dev/null +++ b/sample/actions/configuration.yaml @@ -0,0 +1,13 @@ +apiVersion: elafros.dev/v1alpha1 +kind: Configuration +metadata: + name: action-demo + namespace: default +spec: + revisionTemplate: + metadata: + labels: + elafros.dev/type: container + spec: + container: + image: action-demo:latest \ No newline at end of file diff --git a/sample/actions/route.yaml b/sample/actions/route.yaml new file mode 100644 index 00000000000..d7bc21ea9e2 --- /dev/null +++ b/sample/actions/route.yaml @@ -0,0 +1,23 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: elafros.dev/v1alpha1 +kind: Route +metadata: + name: action-demo + namespace: default +spec: + traffic: + - configurationName: action-demo + percent: 100 From 05444b8c5f5220cc2aab3adbb3296f5d7462e6e1 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Tue, 24 Apr 2018 08:46:34 -0700 Subject: [PATCH 12/14] Add sendevent command utility --- cmd/sendevent/main.go | 104 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 cmd/sendevent/main.go diff --git a/cmd/sendevent/main.go b/cmd/sendevent/main.go new file mode 100644 index 00000000000..0ca29cbb8de --- /dev/null +++ b/cmd/sendevent/main.go @@ -0,0 +1,104 @@ +/* +Copyright 2018 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Implements a simple utility for sending a JSON-encoded sample event. +// Not wired to bazel because this utility is meant to be run directly rather +// than deployed to K8S +package main + +import ( + "crypto/rand" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "time" + + "github.com/elafros/eventing/pkg/event" +) + +var ( + context event.Context + webhook string + data string +) + +func init() { + flag.StringVar(&context.EventID, "event-id", "", "Event ID to use. Defaults to a UUID") + flag.StringVar(&context.EventType, "event-type", "google.events.action.demo", "The Event Type to use.") + flag.StringVar(&context.Source, "source", "", "Source URI to use. Defaults to the current machine's hostname") + flag.StringVar(&data, "data", `{"hello": "world!"}`, "Event data") +} + +func main() { + flag.Parse() + + if flag.NArg() != 1 { + fmt.Println("Missing webhook address") + os.Exit(1) + } + + webhook := flag.Args()[0] + + var untyped map[string]interface{} + if err := json.Unmarshal([]byte(data), &untyped); err != nil { + fmt.Println("Currenlty sendevent only supports JSON event data") + os.Exit(1) + } + + fillEventContext(&context) + req, err := event.NewRequest(webhook, untyped, context) + if err != nil { + fmt.Printf("Failed to create request: %s", err) + os.Exit(1) + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Printf("Failed to send event to %s: %s\n", webhook, err) + os.Exit(1) + } + fmt.Printf("Got response from %s\n%s\n", webhook, res.Status) + if res.Header.Get("Content-Length") != "" { + bytes, _ := ioutil.ReadAll(res.Body) + fmt.Println(string(bytes)) + } +} + +func fillEventContext(ctx *event.Context) { + ctx.CloudEventsVersion = "0.1" + ctx.EventTime = time.Now().UTC() + + if ctx.EventID == "" { + // A "UUID". Not technically spec complaint + b := make([]byte, 16) + if n, err := rand.Read(b); n != 16 || err != nil { + fmt.Println("Could not create event-id UUID") + os.Exit(1) + } + ctx.EventID = fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) + } + + if ctx.Source == "" { + var err error + ctx.Source, err = os.Hostname() + if err != nil { + ctx.Source = "localhost" + } + } +} From 317a5f7adab7d30b06ae42c6ff1dc3d084be6daf Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Tue, 24 Apr 2018 08:46:48 -0700 Subject: [PATCH 13/14] Add actions sample script --- pkg/delivery/action/elafros.go | 25 +++++-- pkg/delivery/receiver.go | 19 ++---- pkg/delivery/sender.go | 12 ++-- sample/actions/BUILD.bazel | 13 ++++ sample/actions/README.md | 116 +++++++++++++++++++++++++++++++++ sample/actions/action.go | 34 +++++++++- sample/actions/bind.yaml | 11 ++++ 7 files changed, 200 insertions(+), 30 deletions(-) create mode 100644 sample/actions/README.md create mode 100644 sample/actions/bind.yaml diff --git a/pkg/delivery/action/elafros.go b/pkg/delivery/action/elafros.go index 64ae966107c..333715967b5 100644 --- a/pkg/delivery/action/elafros.go +++ b/pkg/delivery/action/elafros.go @@ -31,12 +31,16 @@ func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Clien // SendEvent will POST the event spec to the root URI of the elafros route. func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { - glog.V(4).Info("Sending event", context.EventID, "to ELA route") + glog.Infof("Sending event %s to ELA route %s", context.EventID, name) + var namespace, route string parts := strings.Split(name, "/") if len(parts) != 2 { - return nil, fmt.Errorf("Expected elafros route '%s' to be in the form '/'", name) + // TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace + // of the Bind resource. Should ensure that we'll always get routes in this form. + namespace, route = "default", name + } else { + namespace, route = parts[0], parts[1] } - route, namespace := parts[1], parts[0] domain, err := getDomainSuffixFromElaConfig(a.kubeclientset) if err != nil { @@ -44,18 +48,25 @@ func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event. return nil, err } - addr := fmt.Sprintf("http://%s.%s.%s", route, namespace, domain) - glog.V(4).Info("Sending event", context.EventID, "to ELA route at", addr) + addr := fmt.Sprintf("http://%s.%s.%s/", route, namespace, domain) + glog.Info("Sending event", context.EventID, "to ELA route at", addr) req, err := event.NewRequest(addr, data, *context) if err != nil { return nil, err } - if _, err := a.httpclient.Do(req); err != nil { + res, err := a.httpclient.Do(req) + if err != nil { + glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err) return nil, err } + // TODO: Standard handling of non-200 responses as errors. + if res.StatusCode/100 != 2 { + glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr) + } - // TODO: non-200 responses as errors. Standard decoding of responses to be forwarded. + // TODO: Return response data so that it may be forwarded in chained Binds. + glog.Infof("Sent event to %s; got response %s", addr, res.Status) return nil, nil } diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go index 545176e5f5b..07defca33e6 100644 --- a/pkg/delivery/receiver.go +++ b/pkg/delivery/receiver.go @@ -34,20 +34,6 @@ import ( // will do late-time filtering of Events. var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) -<<<<<<< HEAD -// TODO(vaikas): Remove this once Bind's Action has been migrated -// to be generic. -const hardCodedProcessor = "eventing.elafros.dev/EventLogger" - -func actionFromBind(bind *v1alpha1.Bind) queue.ActionType { - return queue.ActionType{ - Name: bind.Spec.Action.RouteName, - Processor: hardCodedProcessor, - } -} - -======= ->>>>>>> Remove Action shim // Receiver manages the HTTP endpoints for receiving events as well as // stats about the queue for processing events. type Receiver struct { @@ -71,6 +57,7 @@ func (r *Receiver) SendEvent(action v1alpha1.BindAction, data interface{}, conte // ServeHTTP implements the external REST API that other services will use to send events. func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { + glog.Infof("Serving %s", req.URL.Path) if req.Method != http.MethodPost { glog.V(3).Info("Cannot handle method", req.Method) w.WriteHeader(http.StatusMethodNotAllowed) @@ -110,4 +97,8 @@ func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } + + glog.Infof("Enqueued event %s for delivery to %s action %s", + context.EventID, bind.Spec.Action.Processor, bind.Spec.Action.Name) + w.WriteHeader(http.StatusOK) } diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go index 3aa906f4ab5..14dd1e66c4a 100644 --- a/pkg/delivery/sender.go +++ b/pkg/delivery/sender.go @@ -62,13 +62,12 @@ func (s *Sender) RunOnce(stopCh <-chan struct{}) error { glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) action, ok := s.actions[event.Action.Processor] if !ok { - names := make([]string, 0, len(s.actions)) - for name := range s.actions { - names = append(names, name) + keys := make([]string, 0, len(s.actions)) + for key := range s.actions { + keys = append(keys, key) } - runtime.HandleError(fmt.Errorf("Event %q is routed to unknown Processor %q. Valid names are %q", - event.Context.EventID, event.Action.Processor, names)) - return true + return fmt.Errorf("Event %s is routed to unknown Processor '%s'; known processors are %q", + event.Context.EventID, event.Action.Processor, keys) } // TODO(inlined): retry strategies for errors and continuations for success. @@ -87,6 +86,7 @@ func (s *Sender) run(stopCh <-chan struct{}) { if err == io.EOF { return } + glog.Warning(err) runtime.HandleError(err) } } diff --git a/sample/actions/BUILD.bazel b/sample/actions/BUILD.bazel index 0089f3e3e69..5f3f08f1340 100644 --- a/sample/actions/BUILD.bazel +++ b/sample/actions/BUILD.bazel @@ -28,6 +28,18 @@ go_image( load("@k8s_object//:defaults.bzl", "k8s_object") +# Step 1: Support for action as a debug statement +k8s_object( + name = "bind", + template = ":bind.yaml", +) + +# Step 2: Support for action as an ELA route +# To enable: update the action-sample Bind to have the following +# Spec.Action: +# Name: action-demo +# Processor: elafros.dev/Route + k8s_object( name = "configuration", images = { @@ -46,6 +58,7 @@ load("@io_bazel_rules_k8s//k8s:objects.bzl", "k8s_objects") k8s_objects( name = "everything", objects = [ + ":bind", ":route", ":configuration", ], diff --git a/sample/actions/README.md b/sample/actions/README.md new file mode 100644 index 00000000000..3844d6e574a --- /dev/null +++ b/sample/actions/README.md @@ -0,0 +1,116 @@ +# Actions + +Demo to show support for dynamic actions, disjoint from the event source's configuration. + +## Prerequisites + +1. [Setup your development environment](../../DEVELOPMENT.md#getting-started) +2. [Start Elafros](../../README.md#start-elafros) +3. Decide on the DNS name that git can then call. Update elafros/elafros/elaconfig.yaml domainSuffix. +For example I used ela.inlined.me as my hostname, so my elaconfig.yaml looks like so: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: ela-config + namespace: ela-system +data: + domainSuffix: ela.inlined.me +``` + +If you were already running the elafros controllers, you will need to kill the ela-controller in the ela-system namespace for it to pick up the new domain suffix. + +4. Since we will be mimicing an internal service, we need access to the event-delivery service. + Expose it with: + + ``` + kubectl expose deployment event-delivery --port=80 --target-port=9090 --type=LoadBalancer --namespace bind-system + ``` + +5. Install the `sendevent` utility + +```bash +go install github.com/elafros/eventing/cmd/sendevent +``` + +6. Save the address of your exposed `event-delivery` service. + +```bash +export EVENT_DELIVERY_IP=$(kubectl get services/event-delivery --namespace bind-system -o go-template='{{ (index .status.loadBalancer.ingress 0).ip }}') +``` + +## Demo 1: Install Bind with a debug-logging action + +### Deploying assets + +You can deploy this to Elafros from the root directory via: + +```shell +bazel run sample/actions:everything.apply +``` + +This created a handful of resources. The one most immediately useful is the `action-demo` Bind. +An event source would normally deliver events to this Bind by posting to + +``` +${EVENT_DELIVERY_SERVICE}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Sending events + +We'll do the same with the `sendevent` utility: + +```bash +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +(You can optionally configure the sent event by choosing any of `--event-id`, `--event-type`, +`--source`, or `--data`) + +### Observing side-effects + +The event processor deployed in configuration.yaml is "eventing.elafros.dev/EventLogger". This +simply emits a structured event to the logs for the "event-delivery" app. View them with: + +``` +kubectl logs --namespace bind-system -lapp=event-delivery +``` + +## Demo 2: Reroute Bind to an Elafros action. + +### Deploying assets + +In the previous step, you already deployed the elafros route (also named 'action-demo') +that we'll use in this step. To make the demo more exciting, edit "action.go" and change +the `databaseURL` constant to a Firebase Realtime Database URL which you are an owner of. + +To make the `action-demo` Bind point to your new action, edit `bind.yaml` and change the spec.action to be the following: + +``` + Name: action-demo + Processor: elafros.dev/Route +``` + +Apply these changes with `blaze run sample/actions:everything.apply` + +### Sending events + +Because we did not change anything about the (mock) event source, the same command will +now have a different effect. + +```bash +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Observing side-effects + +If you load your Firebase Realtime Database, you will see new event-ids appended to the "seenEvents" node every time you run `sendevent` + +## Cleaning up + +To clean up the sample service: + +```shell +bazel run sample/github:everything.delete +``` diff --git a/sample/actions/action.go b/sample/actions/action.go index 1e4b0b22913..39ca03e35a8 100644 --- a/sample/actions/action.go +++ b/sample/actions/action.go @@ -1,3 +1,19 @@ +/* +Copyright 2018 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package main import ( @@ -18,6 +34,7 @@ const ( // SendEventToDB... func SendEventToDB(w http.ResponseWriter, r *http.Request) { + glog.Info("Sending event to RTDB") var data map[string]interface{} context, err := event.FromRequest(&data, r) if err != nil { @@ -26,7 +43,7 @@ func SendEventToDB(w http.ResponseWriter, r *http.Request) { return } - url, err := url.Parse(databaseURL + "seenEvents/" + context.EventID) + url, err := url.Parse(databaseURL + "seenEvents/" + context.EventID + "/.json") if err != nil { glog.Errorf("Failed to parse url %s", err) w.WriteHeader(http.StatusInternalServerError) @@ -40,12 +57,23 @@ func SendEventToDB(w http.ResponseWriter, r *http.Request) { URL: url, Body: ioutil.NopCloser(bytes.NewReader(body)), } - if _, err := http.DefaultClient.Do(write); err != nil { + res, err := http.DefaultClient.Do(write) + if err != nil { glog.Errorf("Failed to write to RTDB: %s", err) } + if res.StatusCode/100 != 2 { + glog.Errorf("Got non-success response from RTDB: %d", res.StatusCode) + } + glog.Info("Success!") + w.WriteHeader(http.StatusOK) } func main() { - http.ListenAndServe(address, http.HandlerFunc(SendEventToDB)) + http.HandleFunc("/healthz", func(w http.ResponseWriter, req *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.Write([]byte(`{"status": "ok"}`)) + }) + http.HandleFunc("/", SendEventToDB) + http.ListenAndServe(address, nil) } diff --git a/sample/actions/bind.yaml b/sample/actions/bind.yaml new file mode 100644 index 00000000000..f49e09716e8 --- /dev/null +++ b/sample/actions/bind.yaml @@ -0,0 +1,11 @@ +apiVersion: eventing.elafros.dev/v1alpha1 +kind: Bind +metadata: + name: action-demo +spec: + trigger: + # Note: trigger is not used right now. + # Demos must currently publish directly to the endpoint for this Bind + action: + name: DemoLoggingAction + processor: eventing.elafros.dev/EventLogger \ No newline at end of file From cae24814a19b0379f3e03eaa86c0e08c8252cb24 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Wed, 25 Apr 2018 11:55:08 -0700 Subject: [PATCH 14/14] Services now work too! --- cmd/delivery/main.go | 1 + controller.yaml | 1 - pkg/delivery/action/BUILD.bazel | 1 + pkg/delivery/action/elafros.go | 10 +++--- pkg/delivery/action/logging.go | 8 ++--- pkg/delivery/action/service.go | 62 +++++++++++++++++++++++++++++++++ sample/actions/BUILD.bazel | 19 +++++++++- sample/actions/README.md | 32 +++++++++++++---- sample/actions/action.go | 7 ++-- sample/actions/bind.yaml | 2 +- sample/actions/deployment.yaml | 33 ++++++++++++++++++ sample/actions/service.yaml | 26 ++++++++++++++ 12 files changed, 182 insertions(+), 20 deletions(-) create mode 100644 pkg/delivery/action/service.go create mode 100644 sample/actions/deployment.yaml create mode 100644 sample/actions/service.yaml diff --git a/cmd/delivery/main.go b/cmd/delivery/main.go index 877cf8f7d91..810103e5f48 100644 --- a/cmd/delivery/main.go +++ b/cmd/delivery/main.go @@ -87,6 +87,7 @@ func main() { processors := map[string]action.Action{ action.LoggingActionType: action.NewLoggingAction(), action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient), + action.ServiceActionType: action.NewServiceAction(http.DefaultClient), } sender := delivery.NewSender(q, processors) diff --git a/controller.yaml b/controller.yaml index 82020a8b654..a81b092990e 100644 --- a/controller.yaml +++ b/controller.yaml @@ -23,7 +23,6 @@ spec: labels: app: bind-controller spec: - serviceAccountName: bind-controller containers: - name: bind-controller image: bind-controller:latest diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel index 0cde53d03de..5daf4ccf451 100644 --- a/pkg/delivery/action/BUILD.bazel +++ b/pkg/delivery/action/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "doc.go", "elafros.go", "logging.go", + "service.go", ], importpath = "github.com/elafros/eventing/pkg/delivery/action", deps = [ diff --git a/pkg/delivery/action/elafros.go b/pkg/delivery/action/elafros.go index 333715967b5..73fb799752b 100644 --- a/pkg/delivery/action/elafros.go +++ b/pkg/delivery/action/elafros.go @@ -18,19 +18,19 @@ const ( ElafrosActionType = "elafros.dev/Route" ) -// ElafrosAction sends Events to an Elafros Route. -type ElafrosAction struct { +// elafrosAction sends Events to an Elafros Route. +type elafrosAction struct { kubeclientset kubernetes.Interface httpclient *http.Client } // NewElafrosAction creates an Action object that sends Events to Elafros Routes. -func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) *ElafrosAction { - return &ElafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} +func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) Action { + return &elafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} } // SendEvent will POST the event spec to the root URI of the elafros route. -func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { +func (a *elafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { glog.Infof("Sending event %s to ELA route %s", context.EventID, name) var namespace, route string parts := strings.Split(name, "/") diff --git a/pkg/delivery/action/logging.go b/pkg/delivery/action/logging.go index 05f98c7c959..298c53ae631 100644 --- a/pkg/delivery/action/logging.go +++ b/pkg/delivery/action/logging.go @@ -30,10 +30,10 @@ const ( ) // A LoggingAction will log and drop Events. -type LoggingAction struct{} +type loggingAction struct{} // SendEvent implements Action.SendEvent -func (a LoggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { +func (a loggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { event := map[string]interface{}{ "data": data, "context": context, @@ -47,6 +47,6 @@ func (a LoggingAction) SendEvent(name string, data interface{}, context *event.C } // NewLoggingAction creates an Action that will log and drop Events. -func NewLoggingAction() LoggingAction { - return LoggingAction{} +func NewLoggingAction() Action { + return loggingAction{} } diff --git a/pkg/delivery/action/service.go b/pkg/delivery/action/service.go new file mode 100644 index 00000000000..6278d0e1ba2 --- /dev/null +++ b/pkg/delivery/action/service.go @@ -0,0 +1,62 @@ +package action + +import ( + "fmt" + "net/http" + "strings" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + // ServiceActionType is the expected Bind Processor name to + // cause Events to be sent to a K8S service. + ServiceActionType = "Service" +) + +// serviceAction sends Events to a K8S service. +type serviceAction struct { + httpclient *http.Client +} + +// NewServiceAction creates an Action object that sends Events to K8S services. +func NewServiceAction(httpclient *http.Client) Action { + return &serviceAction{httpclient: httpclient} +} + +// SendEvent will POST the event spec to the root URI of the elafros route. +func (a *serviceAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + glog.Infof("Sending event %s to K8S service %s", context.EventID, name) + var namespace, service string + parts := strings.Split(name, "/") + if len(parts) != 2 { + // TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace + // of the Bind resource. Should ensure that we'll always get routes in this form. + namespace, service = "default", name + } else { + namespace, service = parts[0], parts[1] + } + + addr := fmt.Sprintf("http://%s.%s.svc.cluster.local", service, namespace) + glog.Info("Sending event", context.EventID, "to K8S service at", addr) + req, err := event.NewRequest(addr, data, *context) + if err != nil { + glog.Errorf("Failed to marshal event: %s", err) + return nil, err + } + + res, err := a.httpclient.Do(req) + if err != nil { + glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err) + return nil, err + } + // TODO: Standard handling of non-200 responses as errors. + if res.StatusCode/100 != 2 { + glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr) + } + + // TODO: Return response data so that it may be forwarded in chained Binds. + glog.Infof("Sent event to %s; got response %s", addr, res.Status) + return nil, nil +} diff --git a/sample/actions/BUILD.bazel b/sample/actions/BUILD.bazel index 5f3f08f1340..71fc7cbb3c1 100644 --- a/sample/actions/BUILD.bazel +++ b/sample/actions/BUILD.bazel @@ -53,6 +53,21 @@ k8s_object( template = ":route.yaml", ) +# Step 3: Support for action as Service + +k8s_object( + name = "deployment", + images = { + "action-demo:latest": ":image", + }, + template = ":deployment.yaml", +) + +k8s_object( + name = "service", + template = ":service.yaml", +) + load("@io_bazel_rules_k8s//k8s:objects.bzl", "k8s_objects") k8s_objects( @@ -61,5 +76,7 @@ k8s_objects( ":bind", ":route", ":configuration", + ":deployment", + ":service", ], -) \ No newline at end of file +) diff --git a/sample/actions/README.md b/sample/actions/README.md index 3844d6e574a..237a258072f 100644 --- a/sample/actions/README.md +++ b/sample/actions/README.md @@ -85,12 +85,7 @@ In the previous step, you already deployed the elafros route (also named 'action that we'll use in this step. To make the demo more exciting, edit "action.go" and change the `databaseURL` constant to a Firebase Realtime Database URL which you are an owner of. -To make the `action-demo` Bind point to your new action, edit `bind.yaml` and change the spec.action to be the following: - -``` - Name: action-demo - Processor: elafros.dev/Route -``` +To make the `action-demo` Bind point to your new action, edit `bind.yaml` and change the spec.action.processor to `elafros.dev/Route` Apply these changes with `blaze run sample/actions:everything.apply` @@ -114,3 +109,28 @@ To clean up the sample service: ```shell bazel run sample/github:everything.delete ``` + +## Demo 3: Reroute Bind to a K8S Service + +### Deploying assets + +This step will reuse the same Docker image as step 2, but will host `action.go` in +a Deployment + Service. To verify that we are targeting a new backend, the Deployment +sets an environment variable to change where in the Firebase Realtime Database events +are published. + +To mkae the `action-demo` bind point to the Service vrsion of our demo, edit `bind.yaml` and change the spec.action.processor to `Service`. + +Apply these changes with `bazel run sample/actions:everything.apply` + +### Sending events + +Again, the same command will now have new effects: + +``` +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Observing side-effects + +If you load your Firebase Realtime Database, you will see new event-ids appended to the "seenEventsInService" node every time you run `sendevent` diff --git a/sample/actions/action.go b/sample/actions/action.go index 39ca03e35a8..8fb8a7de522 100644 --- a/sample/actions/action.go +++ b/sample/actions/action.go @@ -19,16 +19,18 @@ package main import ( "bytes" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/url" + "os" "github.com/elafros/eventing/pkg/event" "github.com/golang/glog" ) const ( - databaseURL = "https://inlined-junkdrawer.firebaseio.com/" + databaseURL = "https://inlined-junkdrawer.firebaseio.com" address = ":8080" ) @@ -43,7 +45,8 @@ func SendEventToDB(w http.ResponseWriter, r *http.Request) { return } - url, err := url.Parse(databaseURL + "seenEvents/" + context.EventID + "/.json") + addr := fmt.Sprintf("%s/seenEvents%s/%s/.json", databaseURL, os.Getenv("KEY_SUFFIX"), context.EventID) + url, err := url.Parse(addr) if err != nil { glog.Errorf("Failed to parse url %s", err) w.WriteHeader(http.StatusInternalServerError) diff --git a/sample/actions/bind.yaml b/sample/actions/bind.yaml index f49e09716e8..9934c35bf00 100644 --- a/sample/actions/bind.yaml +++ b/sample/actions/bind.yaml @@ -7,5 +7,5 @@ spec: # Note: trigger is not used right now. # Demos must currently publish directly to the endpoint for this Bind action: - name: DemoLoggingAction + name: action-demo processor: eventing.elafros.dev/EventLogger \ No newline at end of file diff --git a/sample/actions/deployment.yaml b/sample/actions/deployment.yaml new file mode 100644 index 00000000000..a00a964c839 --- /dev/null +++ b/sample/actions/deployment.yaml @@ -0,0 +1,33 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: action-demo +spec: + replicas: 1 + template: + metadata: + labels: + app: action-demo-service + spec: + containers: + - name: action-demo + image: action-demo:latest + ports: + - containerPort: 8080 + env: + - name: KEY_SUFFIX + value: "InService" \ No newline at end of file diff --git a/sample/actions/service.yaml b/sample/actions/service.yaml new file mode 100644 index 00000000000..c0e14d0100f --- /dev/null +++ b/sample/actions/service.yaml @@ -0,0 +1,26 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + name: action-demo +spec: + selector: + app: action-demo-service + ports: + - protocol: TCP + port: 80 + targetPort: 8080 +