From 78b5d8414d715f8e7175a443c881d24c234b3fd4 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Tue, 27 Mar 2018 16:19:26 -0700 Subject: [PATCH 1/6] Refactor Bind to contain an EventTrigger field. * Changes to "triggers": * Renamed the "triggers" pacakge to "sources" to avoid confusion. * EventTriggers (FKA Sources) now take EventTriggers instead of just params. * source.EventTrigger is different from v1alpha1.EventTrigger because the latter has embedded Raw types that would be hard to use. * Updated GitHub sample: * Renamed the EventSource to "github.com" rather than just "github" to better align with GCF. I think this revealed something about the control plane model that I want to discuss. * Updated pullrequest.yaml to match new format. * As part of this I made a minor change to update-deps to work on mac (gnu-sed still required). --- pkg/apis/bind/v1alpha1/bind_types.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/apis/bind/v1alpha1/bind_types.go b/pkg/apis/bind/v1alpha1/bind_types.go index 9b9899f5e65..6267e02d954 100644 --- a/pkg/apis/bind/v1alpha1/bind_types.go +++ b/pkg/apis/bind/v1alpha1/bind_types.go @@ -40,6 +40,7 @@ type BindAction struct { RouteName string `json:"routeName,omitempty"` } +// EventTrigger describes when an Event should be delivered. type EventTrigger struct { // Required. The type of event to observe. For example: // `google.storage.object.finalize` and From b7b8ff90224075419312b27735cd77c125da3a32 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 16 Apr 2018 15:28:31 -0700 Subject: [PATCH 2/6] First pass at delivery service --- .gitignore | 2 + BUILD | 9 ++ cmd/delivery/BUILD.bazel | 41 +++++++ cmd/delivery/main.go | 139 ++++++++++++++++++++++++ delivery.yaml | 34 ++++++ pkg/delivery/BUILD.bazel | 41 +++++++ pkg/delivery/action/BUILD.bazel | 19 ++++ pkg/delivery/action/action.go | 41 +++++++ pkg/delivery/action/elafros.go | 72 +++++++++++++ pkg/delivery/action/logging.go | 52 +++++++++ pkg/delivery/queue/BUILD.bazel | 19 ++++ pkg/delivery/queue/diagnostics.go | 59 ++++++++++ pkg/delivery/queue/memory_queue.go | 60 +++++++++++ pkg/delivery/queue/queue.go | 42 ++++++++ pkg/delivery/receiver.go | 112 +++++++++++++++++++ pkg/delivery/receiver_test.go | 166 +++++++++++++++++++++++++++++ pkg/delivery/sender.go | 83 +++++++++++++++ pkg/delivery/sender_test.go | 100 +++++++++++++++++ 18 files changed, 1091 insertions(+) create mode 100644 cmd/delivery/BUILD.bazel create mode 100644 cmd/delivery/main.go create mode 100644 delivery.yaml create mode 100644 pkg/delivery/BUILD.bazel create mode 100644 pkg/delivery/action/BUILD.bazel create mode 100644 pkg/delivery/action/action.go create mode 100644 pkg/delivery/action/elafros.go create mode 100644 pkg/delivery/action/logging.go create mode 100644 pkg/delivery/queue/BUILD.bazel create mode 100644 pkg/delivery/queue/diagnostics.go create mode 100644 pkg/delivery/queue/memory_queue.go create mode 100644 pkg/delivery/queue/queue.go create mode 100644 pkg/delivery/receiver.go create mode 100644 pkg/delivery/receiver_test.go create mode 100644 pkg/delivery/sender.go create mode 100644 pkg/delivery/sender_test.go diff --git a/.gitignore b/.gitignore index ac51a054d2d..7f5afbb95a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ bazel-* +.idea +.vscode diff --git a/BUILD b/BUILD index 2f429daadff..8dfa98d9d0e 100644 --- a/BUILD +++ b/BUILD @@ -17,6 +17,14 @@ k8s_object( template = "controller.yaml", ) +k8s_object( + name = "delivery", + images = { + "event-delivery:latest": "//cmd/delivery:image", + }, + template = "delivery.yaml", +) + k8s_object( name = "namespace", template = "namespace.yaml", @@ -66,5 +74,6 @@ k8s_objects( ":eventtype", ":eventsource", ":controller", + ":delivery", ], ) diff --git a/cmd/delivery/BUILD.bazel b/cmd/delivery/BUILD.bazel new file mode 100644 index 00000000000..2f9cdf00a76 --- /dev/null +++ b/cmd/delivery/BUILD.bazel @@ -0,0 +1,41 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importpath = "github.com/elafros/eventing/cmd/delivery", + visibility = ["//visibility:private"], + deps = [ + "//pkg/client/clientset/versioned:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/bind:go_default_library", + "//pkg/delivery:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/signals:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/plugin/pkg/client/auth/gcp:go_default_library", + "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", + ], +) + +go_binary( + name = "delivery", + embed = [":go_default_library"], + importpath = "github.com/elafros/eventing/cmd/delivery", + pure = "on", + visibility = ["//visibility:public"], +) + +load("@io_bazel_rules_docker//go:image.bzl", "go_image") + +go_image( + name = "image", + binary = ":delivery", +) diff --git a/cmd/delivery/main.go b/cmd/delivery/main.go new file mode 100644 index 00000000000..e9f16a9f101 --- /dev/null +++ b/cmd/delivery/main.go @@ -0,0 +1,139 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "net/http" + "time" + + "github.com/golang/glog" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + clientset "github.com/elafros/eventing/pkg/client/clientset/versioned" + informers "github.com/elafros/eventing/pkg/client/informers/externalversions" + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/signals" + + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + eventBufferSize = 100 + senderThreads = 10 + serverAddress = ":9090" + metricsScrapePath = "/metrics" + debugQueuePath = "/queue/" + sendEventPrefix = "/v1alpha1/" +) + +var ( + masterURL string + kubeconfig string +) + +func main() { + flag.Parse() + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + if err != nil { + glog.Fatalf("Error building kubeconfig: %s", err.Error()) + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + + client, err := clientset.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building clientset: %s", err.Error()) + } + + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) + informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) + bindLister := informerFactory.Eventing().V1alpha1().Binds().Lister() + + q := queue.NewInMemoryQueue(eventBufferSize) + receiver := delivery.NewReceiver(bindLister, q) + + processors := map[string]action.Action{ + action.LoggingActionType: action.NewLoggingAction(), + action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient), + } + sender := delivery.NewSender(q, processors) + + go kubeInformerFactory.Start(stopCh) + go informerFactory.Start(stopCh) + + // Start sender: + go func() { + // We don't expect this to return until stop is called, + // but if it does, propagate it back. + glog.Info("Staring event sender") + if err := sender.Run(senderThreads, stopCh); err != nil { + glog.Fatalf("Error running controller: %s", err.Error()) + } + }() + + // Set up HTTP endpoints + glog.Infof("Set up metrics scrape path %s", metricsScrapePath) + glog.Infof("Set up debug queue path %s", debugQueuePath) + glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) + + mux := http.NewServeMux() + mux.Handle(metricsScrapePath, promhttp.Handler()) + mux.Handle(debugQueuePath, queue.NewDiagnosticHandler(q)) + mux.Handle(sendEventPrefix, receiver) + mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + glog.Infof("Unhandled %s to route %q", req.Method, req.URL.Path) + http.NotFound(w, req) + }) + + // Start the API server + srv := &http.Server{Addr: serverAddress, Handler: mux} + go func() { + glog.Infof("Starting API server at %s", serverAddress) + if err := srv.ListenAndServe(); err != nil { + glog.Infof("Httpserver: ListenAndServe() finished with error: %s", err) + } + }() + + <-stopCh + + // Close the http server gracefully + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + srv.Shutdown(ctx) + + glog.Flush() +} + +func init() { + flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") +} diff --git a/delivery.yaml b/delivery.yaml new file mode 100644 index 00000000000..6003ecd9a3f --- /dev/null +++ b/delivery.yaml @@ -0,0 +1,34 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: event-delivery + namespace: bind-system +spec: + replicas: 1 + template: + metadata: + labels: + app: event-delivery + spec: + # TODO(inlined): create new narrower role + serviceAccountName: bind-controller + containers: + - name: event-delivery + image: event-delivery:latest + args: [ + "-logtostderr", + "-stderrthreshold", "INFO", + ] diff --git a/pkg/delivery/BUILD.bazel b/pkg/delivery/BUILD.bazel new file mode 100644 index 00000000000..b4f9e2c04e0 --- /dev/null +++ b/pkg/delivery/BUILD.bazel @@ -0,0 +1,41 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "receiver.go", + "sender.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery", + deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/client/listers/bind/v1alpha1:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "receiver_test.go", + "sender_test.go", + ], + deps = [ + ":go_default_library", + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/client/clientset/versioned/fake:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/event:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel new file mode 100644 index 00000000000..fb707651fdc --- /dev/null +++ b/pkg/delivery/action/BUILD.bazel @@ -0,0 +1,19 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "action.go", + "elafros.go", + "logging.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery/action", + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) diff --git a/pkg/delivery/action/action.go b/pkg/delivery/action/action.go new file mode 100644 index 00000000000..177664fd814 --- /dev/null +++ b/pkg/delivery/action/action.go @@ -0,0 +1,41 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "github.com/elafros/eventing/pkg/event" +) + +// The Action interface implements the capability to send events to an action of +// a particular category (e.g. Elafros routes, WebHooks, ETLs) +// Random thought: optional additional interface for SendEventAsync. We've wanted +// this in the past to allow infrastructure optimizations. +type Action interface { + // SendEvent delivers an event to the named Action synchronously. + // Should return the response from the action (for possible continuation) + // or an error + // TODO: May need interfaces to indicate whether errors are fatal or retryable. + SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) +} + +// ActionFunc allows simple Action types to be implemented as a standalone function. +type ActionFunc func(name string, data interface{}, context *event.Context) (interface{}, error) + +// SendEvent implements the Action interface for a function with the same signature as Action.SendEvent. +func (a ActionFunc) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + return a(name, data, context) +} diff --git a/pkg/delivery/action/elafros.go b/pkg/delivery/action/elafros.go new file mode 100644 index 00000000000..5224759f375 --- /dev/null +++ b/pkg/delivery/action/elafros.go @@ -0,0 +1,72 @@ +package action + +import ( + "fmt" + "net/http" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" + "k8s.io/client-go/kubernetes" +) + +const ( + // ElafrosActionType is the expected Bind Processor name to + // cause Events to be sent to an Elafros Route. + ElafrosActionType = "elafros.dev/Route" +) + +// ElafrosAction sends Events to an Elafros Route. +type ElafrosAction struct { + kubeclientset kubernetes.Interface + httpclient *http.Client +} + +// NewElafrosAction creates an Action object that sends Events to Elafros Routes. +func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) *ElafrosAction { + return &ElafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} +} + +// SendEvent will POST the event spec to the root URI of the elafros route. +func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + parts := strings.Split(name, "/") + if len(parts) != 2 { + return nil, fmt.Errorf("Expected elafros route '%s' to be in the form '/'", name) + } + route, namespace := parts[1], parts[0] + + domain, err := getDomainSuffixFromElaConfig(a.kubeclientset) + if err != nil { + glog.Error("Could not look up Elafros domain") + return nil, err + } + + addr := fmt.Sprintf("http://%s.%s.%s", route, namespace, domain) + req, err := event.NewRequest(addr, data, *context) + if err != nil { + return nil, err + } + + if _, err := a.httpclient.Do(req); err != nil { + return nil, err + } + + // TODO: non-200 responses as errors. Standard decoding of responses to be forwarded. + return nil, nil +} + +func getDomainSuffixFromElaConfig(cl kubernetes.Interface) (string, error) { + const name = "ela-config" + const namespace = "ela-system" + c, err := cl.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + domainSuffix, ok := c.Data["domainSuffix"] + if !ok { + return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) + } + return domainSuffix, nil +} diff --git a/pkg/delivery/action/logging.go b/pkg/delivery/action/logging.go new file mode 100644 index 00000000000..05f98c7c959 --- /dev/null +++ b/pkg/delivery/action/logging.go @@ -0,0 +1,52 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "encoding/json" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + // LoggingActionType is the expected Bind Processor type + // that will cause events to be logged and dropped. + LoggingActionType = "eventing.elafros.dev/EventLogger" +) + +// A LoggingAction will log and drop Events. +type LoggingAction struct{} + +// SendEvent implements Action.SendEvent +func (a LoggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + event := map[string]interface{}{ + "data": data, + "context": context, + } + b, err := json.Marshal(event) + if err != nil { + return nil, err + } + glog.Infof("[%s] sendEvent %s", name, string(b)) + return nil, nil +} + +// NewLoggingAction creates an Action that will log and drop Events. +func NewLoggingAction() LoggingAction { + return LoggingAction{} +} diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel new file mode 100644 index 00000000000..1336348d5e6 --- /dev/null +++ b/pkg/delivery/queue/BUILD.bazel @@ -0,0 +1,19 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "diagnostics.go", + "memory_queue.go", + "queue.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery/queue", + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) diff --git a/pkg/delivery/queue/diagnostics.go b/pkg/delivery/queue/diagnostics.go new file mode 100644 index 00000000000..af3b0517ca5 --- /dev/null +++ b/pkg/delivery/queue/diagnostics.go @@ -0,0 +1,59 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "encoding/json" + "net/http" + + "github.com/golang/glog" +) + +type diagnosticsHandler struct { + Queue + *http.ServeMux +} + +// NewDiagnosticHandler provides endpoints for inspecting +// and diagnsing queued events. +// Eventually we'll need to decide more about how we will shard our queues +// and what visibility we'll give to each. E.g. I suspect we'll support looking +// at the lenght of queues for a single Flow or Action. +func NewDiagnosticHandler(q Queue) http.Handler { + h := &diagnosticsHandler{Queue: q, ServeMux: http.NewServeMux()} + h.HandleFunc("/queue/length", h.GetLength) + h.HandleFunc("/queue/", func(w http.ResponseWriter, r *http.Request) { + glog.Infof("Do not know how to handle queue path", r.URL.Path) + http.NotFound(w, r) + }) + return h +} + +func (h *diagnosticsHandler) GetLength(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + glog.Warningf("Unauthorized method on /queue/length") + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + glog.V(4).Info("Reporting queue length") + b, err := json.Marshal(map[string]int{"length": h.Length()}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + w.Write(b) +} diff --git a/pkg/delivery/queue/memory_queue.go b/pkg/delivery/queue/memory_queue.go new file mode 100644 index 00000000000..f74f7d1e048 --- /dev/null +++ b/pkg/delivery/queue/memory_queue.go @@ -0,0 +1,60 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import "errors" + +// InMemoryQueue implements the queue interface with a memory buffer. +// Note: this isn't just a simple typedef for a queue because we will soon start +// experimenting with other features, such as fetching an event separate from acking, +// transactional ack + enqueue, cursors, etc. +type InMemoryQueue struct { + ch chan QueuedEvent +} + +// NewInMemoryQueue creates a new InMemoryQueue +func NewInMemoryQueue(length int) *InMemoryQueue { + ch := make(chan QueuedEvent, length) + return &InMemoryQueue{ + ch: ch, + } +} + +// Push implements Queue.Push +func (q *InMemoryQueue) Push(event QueuedEvent) error { + select { + case q.ch <- event: + return nil + default: + return errors.New("Event queue is out of memory") + } +} + +// Pull implements Queue.Pull +func (q *InMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { + select { + case event, ok := <-q.ch: + return event, ok + case <-stopCh: + return QueuedEvent{}, false + } +} + +// Length returns Queue.Length +func (q InMemoryQueue) Length() int { + return len(q.ch) +} diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go new file mode 100644 index 00000000000..d8db4d1142c --- /dev/null +++ b/pkg/delivery/queue/queue.go @@ -0,0 +1,42 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "github.com/elafros/eventing/pkg/event" +) + +// QueuedEvents are saved to an EventQueue before delivery +type QueuedEvent struct { + Action ActionType `json:"action"` + Data interface{} `json:"data"` + Context *event.Context `json:"context"` +} + +// TODO(vaikas): Remove this once Bind's Action has been migrated +// to be generic. +type ActionType struct { + Name string `json:"name"` + Processor string `json:"processor"` +} + +// Queue implements basic features to allow asynchronous buffering of events. +type Queue interface { + Push(event QueuedEvent) error + Pull(stopCh <-chan struct{}) (event QueuedEvent, ok bool) + Length() int +} diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go new file mode 100644 index 00000000000..f9ad8fd50a0 --- /dev/null +++ b/pkg/delivery/receiver.go @@ -0,0 +1,112 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery + +import ( + "fmt" + "net/http" + "regexp" + + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + + listers "github.com/elafros/eventing/pkg/client/listers/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" +) + +// If an event source knows the exact flow it is targeting it can bypass the work involved with +// processing event triggers. +var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) + +// TODO(vaikas): Remove this once Bind's Action has been migrated +// to be generic. +const alwaysUseProcessor = "eventing.elafros.dev/EventLogger" + +func actionFromBind(bind *v1alpha1.Bind) queue.ActionType { + return queue.ActionType{ + Name: bind.Spec.Action.RouteName, + Processor: alwaysUseProcessor, + } +} + +// Receiver manages the HTTP endpoints for receiving events as well as +// stats about the queue for processing events. +type Receiver struct { + eventQueue queue.Queue + bindsLister listers.BindLister +} + +// NewReceiver creates a new Reciever object to enqueue events. +func NewReceiver(bindsLister listers.BindLister, eventQueue queue.Queue) *Receiver { + return &Receiver{bindsLister: bindsLister, eventQueue: eventQueue} +} + +// SendEvent enqueues an event data and Context for delivery to a particular action. +func (r *Receiver) SendEvent(action queue.ActionType, data interface{}, context *event.Context) error { + return r.eventQueue.Push(queue.QueuedEvent{ + Action: action, + Data: data, + Context: context, + }) +} + +// ServeHTTP implements the external REST API that other services will use to send events. +func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + glog.V(3).Info("Cannot handle method", req.Method) + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + matches := directSendEventRegExp.FindStringSubmatch(req.URL.Path) + if len(matches) != 3 { + glog.V(3).Info("Cannot parse route", req.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + + bind, err := r.bindsLister.Binds(string(matches[1])).Get(string(matches[2])) + if err != nil { + if errors.IsNotFound(err) { + fmt.Printf("Could not find Bind %s in namespace %s\n", matches[2], matches[1]) + glog.V(3).Infof("Event sent to non-existant Bind %s in namespace %s", matches[2], matches[1]) + w.WriteHeader(http.StatusNotFound) + return + } + glog.Errorf("Unknown error fetching bind %s in namespace %s: %s", matches[2], matches[1], err) + w.WriteHeader(http.StatusInternalServerError) + return + } + var data interface{} + context, err := event.FromRequest(&data, req) + if err != nil { + glog.Error("Failed to unmarshal event ", err) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + if err := r.SendEvent(actionFromBind(bind), data, context); err != nil { + glog.Error("Failed to enqueue event", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/pkg/delivery/receiver_test.go b/pkg/delivery/receiver_test.go new file mode 100644 index 00000000000..c8280ca2f7b --- /dev/null +++ b/pkg/delivery/receiver_test.go @@ -0,0 +1,166 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery_test + +import ( + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/client/clientset/versioned/fake" + informers "github.com/elafros/eventing/pkg/client/informers/externalversions" + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" +) + +var ( + demoBind = &v1alpha1.Bind{ + TypeMeta: metav1.TypeMeta{ + Kind: "Bind", + APIVersion: "eventing.elafros.dev/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "demo", + Namespace: "default", + }, + Spec: v1alpha1.BindSpec{ + Trigger: v1alpha1.EventTrigger{ + EventType: "org.example.object.create", + Service: "example.org", + Resource: "{abc}/123", + }, + Action: v1alpha1.BindAction{ + RouteName: "vaikas.fix.this", + }, + }, + } +) + +func TestReceiverHttpErrors(t *testing.T) { + tests := []struct { + description string + method string + path string + headers http.Header + body interface{} + expectedStatus int + }{ + { + description: "bad method", + method: http.MethodGet, + expectedStatus: http.StatusMethodNotAllowed, + }, + { + description: "bad path", + method: http.MethodPost, + path: "/abc/123", + expectedStatus: http.StatusNotFound, + }, + { + description: "bad event", + method: http.MethodPost, + path: "/v1alpha1/namespaces/default/flows/demo:sendEvent", + // no headers, so we're missing all default fields + expectedStatus: http.StatusBadRequest, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + client := fake.NewSimpleClientset(demoBind) + factory := informers.NewSharedInformerFactory(client, 0).Eventing().V1alpha1().Binds() + factory.Informer().GetIndexer().Add(demoBind) + lister := factory.Lister() + + q := queue.NewInMemoryQueue(1) + r := delivery.NewReceiver(lister, q) + s := httptest.NewServer(r) + defer s.Close() + + url, err := url.Parse(s.URL + test.path) + if err != nil { + t.Fatalf("Unexpected error crafting test URL: %s", err) + } + res, err := s.Client().Do(&http.Request{ + Method: test.method, + URL: url, + }) + if err != nil { + t.Fatalf("Failed to reach test server: %s", err) + } + if res.StatusCode != test.expectedStatus { + t.Fatalf("Wrong status; expected=%d got=%d", test.expectedStatus, res.StatusCode) + } + }) + } +} + +func TestEnqueueEvent(t *testing.T) { + client := fake.NewSimpleClientset(demoBind) + factory := informers.NewSharedInformerFactory(client, 0).Eventing().V1alpha1().Binds() + factory.Informer().GetIndexer().Add(demoBind) + lister := factory.Lister() + + q := queue.NewInMemoryQueue(1) + r := delivery.NewReceiver(lister, q) + s := httptest.NewServer(r) + defer s.Close() + + data := map[string]interface{}{"hello": "world"} + context := &event.Context{ + CloudEventsVersion: "0.0.1-rc1", + EventID: "123abc", + EventTime: time.Now().UTC(), + EventType: "org.example.object.create", + Source: "https://example.org/examples/example1", + } + + path := s.URL + "/v1alpha1/namespaces/default/flows/demo:sendEvent" + req, err := event.NewRequest(path, data, *context) + if err != nil { + t.Fatalf("Failed to create request: %s", err) + } + + res, err := s.Client().Do(req) + if err != nil { + t.Fatalf("Unexpected error calling sendEvent: %s", err) + } + if res.StatusCode != http.StatusOK { + t.Fatalf("SendEvent request %+v failed with response %+v", req, res) + } + + if q.Length() != 1 { + t.Fatalf("Expected one queued event; got %d", q.Length()) + } + event, ok := q.Pull(nil) + if !ok { + t.Fatal("Failed to pull queued event") + } + if !reflect.DeepEqual(context, event.Context) { + t.Fatalf("Event context was not marshalled correctly; expected=%+v got=%+v", context, event.Context) + } + if !reflect.DeepEqual(data, event.Data) { + t.Fatalf("Event data was not marshalled correctly; expected=%+v got=%+v", data, event.Data) + } +} diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go new file mode 100644 index 00000000000..b4c330e4e7d --- /dev/null +++ b/pkg/delivery/sender.go @@ -0,0 +1,83 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery + +import ( + "fmt" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" +) + +// Sender delivers queued events to their target Actions. +type Sender struct { + eventQueue queue.Queue + actions map[string]action.Action +} + +// NewSender creates a new Sender object to deliver Events which were +// enqueued by a Receiver. +func NewSender( + eventQueue queue.Queue, + actions map[string]action.Action) *Sender { + return &Sender{ + eventQueue: eventQueue, + actions: actions, + } +} + +// RunOnce processes a single event from the queue. +// TODO: If the Queue was redesigned so that events are pulled with a lease, then we could restructure RunOnce to not +// need the stopCh (though does it really matter now anyway if the in-memory queue goes away?) +func (s *Sender) RunOnce(stopCh <-chan struct{}) bool { + // TODO(inlined): Pull should only take a lease on the queued event. The receiver should need + // to acknowledge the event and possibly transactionally enqueue subsequent events. + event, ok := s.eventQueue.Pull(stopCh) + if !ok { + glog.V(4).Info("RunOnce shutting down") + return false + } + + glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) + action, ok := s.actions[event.Action.Processor] + if !ok { + runtime.HandleError(fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor)) + return true + } + + // TODO(inlined): retry strategies for errors and continuations for success. + res, err := action.SendEvent(event.Action.Name, event.Data, event.Context) + if err != nil { + runtime.HandleError(fmt.Errorf("Action for event %s returned error: %s", event.Context.EventID, err)) + } else { + glog.V(4).Info("Event %s handled with response %+v", event.Context.EventID, res) + } + return true +} + +// Run runs Sender until stopCh is closed. +func (s *Sender) Run(threadiness int, stopCh <-chan struct{}) error { + for i := 0; i < threadiness; i++ { + go wait.Until(func() { s.RunOnce(stopCh) }, time.Second, stopCh) + } + return nil +} diff --git a/pkg/delivery/sender_test.go b/pkg/delivery/sender_test.go new file mode 100644 index 00000000000..f0ad361b8ee --- /dev/null +++ b/pkg/delivery/sender_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery_test + +import ( + "reflect" + "sync" + "testing" + + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" +) + +func TestSendEvent(t *testing.T) { + actionInvoked := false + expectedData := map[string]interface{}{ + "hello": "world", + } + expectedContext := &event.Context{ + EventID: "123", + } + actionName := "projects/demo/region/us-central1/function/test" + actionType := "functions.googleapis.com" + callback := func(name string, data interface{}, context *event.Context) (interface{}, error) { + if actionInvoked { + t.Fatal("Action invoked twice") + } + actionInvoked = true + if name != actionName { + t.Fatalf("Action invoked with wrong name. expected=%s got=%s", actionName, name) + } + if !reflect.DeepEqual(expectedData, data) { + t.Fatalf("Action invoked with wrong data. expected=%v got=%v", expectedData, data) + } + if !reflect.DeepEqual(expectedContext, context) { + t.Fatalf("Action invoked with wrong context. expected=%v got=%v", expectedContext, context) + } + + return nil, nil + } + + q := queue.NewInMemoryQueue(1) + sender := delivery.NewSender(q, map[string]action.Action{ + actionType: action.ActionFunc(callback), + }) + + q.Push(queue.QueuedEvent{ + Context: expectedContext, + Data: expectedData, + Action: queue.ActionType{ + Processor: actionType, + Name: actionName, + }, + }) + + if ok := sender.RunOnce(nil); !ok { + t.Fatal("expected RunOnce() to return true") + } + if !actionInvoked { + t.Fatal("Did not invoke action") + } +} + +func TestShutdown(t *testing.T) { + q := queue.NewInMemoryQueue(0) + sender := delivery.NewSender(q, nil) + stopCh := make(chan struct{}) + var wait sync.WaitGroup + done := false + wait.Add(1) + go func() { + sender.Run(20, stopCh) + done = true + wait.Done() + }() + if done { + t.Fatalf("Sender stops automatically") + } + close(stopCh) + wait.Wait() + if !done { + t.Fatalf("Should be done") + } +} From 806c479e4a6c17f9cf7bc4d2d40169f67ebcece8 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 23 Apr 2018 14:28:41 -0700 Subject: [PATCH 3/6] review feedback --- cmd/delivery/BUILD.bazel | 1 - cmd/delivery/main.go | 14 +++++++------- pkg/delivery/action/BUILD.bazel | 1 + pkg/delivery/action/action.go | 4 +++- pkg/delivery/queue/diagnostics.go | 4 ++-- pkg/delivery/queue/memory_queue.go | 12 ++++++------ pkg/delivery/queue/queue.go | 3 ++- pkg/delivery/receiver.go | 22 ++++++++++------------ pkg/delivery/sender.go | 5 +++++ pkg/event/event_test.go | 2 +- 10 files changed, 37 insertions(+), 31 deletions(-) diff --git a/cmd/delivery/BUILD.bazel b/cmd/delivery/BUILD.bazel index 2f9cdf00a76..a9abbb1d42f 100644 --- a/cmd/delivery/BUILD.bazel +++ b/cmd/delivery/BUILD.bazel @@ -30,7 +30,6 @@ go_binary( embed = [":go_default_library"], importpath = "github.com/elafros/eventing/cmd/delivery", pure = "on", - visibility = ["//visibility:public"], ) load("@io_bazel_rules_docker//go:image.bzl", "go_image") diff --git a/cmd/delivery/main.go b/cmd/delivery/main.go index e9f16a9f101..877cf8f7d91 100644 --- a/cmd/delivery/main.go +++ b/cmd/delivery/main.go @@ -1,5 +1,5 @@ /* -Copyright 2017 The Kubernetes Authors. +Copyright 2018 Google, Inc. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Implements a process that wraps an event-receiving webhook, an event +// queue, and a sender which delivers events to their eventual destination. package main import ( @@ -54,6 +56,7 @@ var ( ) func main() { + defer glog.Flush() flag.Parse() // set up signals so we handle the first shutdown signal gracefully @@ -101,13 +104,12 @@ func main() { }() // Set up HTTP endpoints - glog.Infof("Set up metrics scrape path %s", metricsScrapePath) - glog.Infof("Set up debug queue path %s", debugQueuePath) - glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) - mux := http.NewServeMux() + glog.Infof("Set up metrics scrape path %s", metricsScrapePath) mux.Handle(metricsScrapePath, promhttp.Handler()) + glog.Infof("Set up debug queue path %s", debugQueuePath) mux.Handle(debugQueuePath, queue.NewDiagnosticHandler(q)) + glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) mux.Handle(sendEventPrefix, receiver) mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { glog.Infof("Unhandled %s to route %q", req.Method, req.URL.Path) @@ -129,8 +131,6 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() srv.Shutdown(ctx) - - glog.Flush() } func init() { diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel index fb707651fdc..5864071b545 100644 --- a/pkg/delivery/action/BUILD.bazel +++ b/pkg/delivery/action/BUILD.bazel @@ -6,6 +6,7 @@ go_library( name = "go_default_library", srcs = [ "action.go", + "doc.go", "elafros.go", "logging.go", ], diff --git a/pkg/delivery/action/action.go b/pkg/delivery/action/action.go index 177664fd814..f4bce8172ae 100644 --- a/pkg/delivery/action/action.go +++ b/pkg/delivery/action/action.go @@ -27,7 +27,9 @@ import ( type Action interface { // SendEvent delivers an event to the named Action synchronously. // Should return the response from the action (for possible continuation) - // or an error + // or an error. + // Returns an interface{} result which is currently unused, but will be used to + // allow continuations in the future. // TODO: May need interfaces to indicate whether errors are fatal or retryable. SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) } diff --git a/pkg/delivery/queue/diagnostics.go b/pkg/delivery/queue/diagnostics.go index af3b0517ca5..e4f87dd03e2 100644 --- a/pkg/delivery/queue/diagnostics.go +++ b/pkg/delivery/queue/diagnostics.go @@ -29,10 +29,10 @@ type diagnosticsHandler struct { } // NewDiagnosticHandler provides endpoints for inspecting -// and diagnsing queued events. +// and diagnosing queued events. // Eventually we'll need to decide more about how we will shard our queues // and what visibility we'll give to each. E.g. I suspect we'll support looking -// at the lenght of queues for a single Flow or Action. +// at the length of queues for a single Flow or Action. func NewDiagnosticHandler(q Queue) http.Handler { h := &diagnosticsHandler{Queue: q, ServeMux: http.NewServeMux()} h.HandleFunc("/queue/length", h.GetLength) diff --git a/pkg/delivery/queue/memory_queue.go b/pkg/delivery/queue/memory_queue.go index f74f7d1e048..819f45442a0 100644 --- a/pkg/delivery/queue/memory_queue.go +++ b/pkg/delivery/queue/memory_queue.go @@ -22,20 +22,20 @@ import "errors" // Note: this isn't just a simple typedef for a queue because we will soon start // experimenting with other features, such as fetching an event separate from acking, // transactional ack + enqueue, cursors, etc. -type InMemoryQueue struct { +type inMemoryQueue struct { ch chan QueuedEvent } // NewInMemoryQueue creates a new InMemoryQueue -func NewInMemoryQueue(length int) *InMemoryQueue { +func NewInMemoryQueue(length int) Queue { ch := make(chan QueuedEvent, length) - return &InMemoryQueue{ + return &inMemoryQueue{ ch: ch, } } // Push implements Queue.Push -func (q *InMemoryQueue) Push(event QueuedEvent) error { +func (q *inMemoryQueue) Push(event QueuedEvent) error { select { case q.ch <- event: return nil @@ -45,7 +45,7 @@ func (q *InMemoryQueue) Push(event QueuedEvent) error { } // Pull implements Queue.Pull -func (q *InMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { +func (q *inMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { select { case event, ok := <-q.ch: return event, ok @@ -55,6 +55,6 @@ func (q *InMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { } // Length returns Queue.Length -func (q InMemoryQueue) Length() int { +func (q inMemoryQueue) Length() int { return len(q.ch) } diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go index d8db4d1142c..07495e2e6c2 100644 --- a/pkg/delivery/queue/queue.go +++ b/pkg/delivery/queue/queue.go @@ -20,13 +20,14 @@ import ( "github.com/elafros/eventing/pkg/event" ) -// QueuedEvents are saved to an EventQueue before delivery +// QueuedEvent is the element saved to an EventQueue before delivery type QueuedEvent struct { Action ActionType `json:"action"` Data interface{} `json:"data"` Context *event.Context `json:"context"` } +// ActionType is a shim while Bind's Action is not generic. // TODO(vaikas): Remove this once Bind's Action has been migrated // to be generic. type ActionType struct { diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go index f9ad8fd50a0..860189ea0d3 100644 --- a/pkg/delivery/receiver.go +++ b/pkg/delivery/receiver.go @@ -17,7 +17,6 @@ limitations under the License. package delivery import ( - "fmt" "net/http" "regexp" @@ -30,18 +29,19 @@ import ( "k8s.io/apimachinery/pkg/api/errors" ) -// If an event source knows the exact flow it is targeting it can bypass the work involved with -// processing event triggers. +// Currently the delivery service requires that Events are sent directly to an endpoint named +// after a Bind which the Event matches. Future versions may offer a "firehose" endpoint that +// will do late-time filtering of Events. var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) // TODO(vaikas): Remove this once Bind's Action has been migrated // to be generic. -const alwaysUseProcessor = "eventing.elafros.dev/EventLogger" +const hardCodedProcessor = "eventing.elafros.dev/EventLogger" func actionFromBind(bind *v1alpha1.Bind) queue.ActionType { return queue.ActionType{ Name: bind.Spec.Action.RouteName, - Processor: alwaysUseProcessor, + Processor: hardCodedProcessor, } } @@ -52,7 +52,7 @@ type Receiver struct { bindsLister listers.BindLister } -// NewReceiver creates a new Reciever object to enqueue events. +// NewReceiver creates a new Receiver object to enqueue events. func NewReceiver(bindsLister listers.BindLister, eventQueue queue.Queue) *Receiver { return &Receiver{bindsLister: bindsLister, eventQueue: eventQueue} } @@ -81,15 +81,15 @@ func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - bind, err := r.bindsLister.Binds(string(matches[1])).Get(string(matches[2])) + namespace, flowName := string(matches[1]), string(matches[2]) + bind, err := r.bindsLister.Binds(namespace).Get(flowName) if err != nil { if errors.IsNotFound(err) { - fmt.Printf("Could not find Bind %s in namespace %s\n", matches[2], matches[1]) - glog.V(3).Infof("Event sent to non-existant Bind %s in namespace %s", matches[2], matches[1]) + glog.V(3).Infof("Event sent to non-existant Bind %s in namespace %s", flowName, namespace) w.WriteHeader(http.StatusNotFound) return } - glog.Errorf("Unknown error fetching bind %s in namespace %s: %s", matches[2], matches[1], err) + glog.Errorf("Unknown error fetching bind %s in namespace %s: %s", flowName, namespace, err) w.WriteHeader(http.StatusInternalServerError) return } @@ -107,6 +107,4 @@ func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - - w.WriteHeader(http.StatusOK) } diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go index b4c330e4e7d..77bfdb9f30b 100644 --- a/pkg/delivery/sender.go +++ b/pkg/delivery/sender.go @@ -79,5 +79,10 @@ func (s *Sender) Run(threadiness int, stopCh <-chan struct{}) error { for i := 0; i < threadiness; i++ { go wait.Until(func() { s.RunOnce(stopCh) }, time.Second, stopCh) } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + return nil } diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index b6b7e79127b..9ce07cbf9ce 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -27,7 +27,7 @@ import ( var ( context = &event.Context{ CloudEventsVersion: "0.0.1-preview-1", - EventId: "eventid-123", + EventID: "eventid-123", EventType: "google.firestore.document.create", EventTime: time.Now().UTC(), Source: "//firestore.googleapis.com/projects/demo/databases/default/documents/users/inlined", From 2d2690c9f8e45ea755d6321113e8fa7a57b00441 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 23 Apr 2018 14:32:08 -0700 Subject: [PATCH 4/6] Automatic changes by build tools --- cmd/delivery/BUILD.bazel | 2 -- pkg/delivery/BUILD.bazel | 4 +++- pkg/delivery/action/BUILD.bazel | 2 +- pkg/delivery/queue/BUILD.bazel | 4 +--- pkg/event/BUILD.bazel | 5 +++-- pkg/sources/BUILD | 2 +- 6 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cmd/delivery/BUILD.bazel b/cmd/delivery/BUILD.bazel index a9abbb1d42f..a8510e80ce9 100644 --- a/cmd/delivery/BUILD.bazel +++ b/cmd/delivery/BUILD.bazel @@ -10,8 +10,6 @@ go_library( deps = [ "//pkg/client/clientset/versioned:go_default_library", "//pkg/client/informers/externalversions:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/controller/bind:go_default_library", "//pkg/delivery:go_default_library", "//pkg/delivery/action:go_default_library", "//pkg/delivery/queue:go_default_library", diff --git a/pkg/delivery/BUILD.bazel b/pkg/delivery/BUILD.bazel index b4f9e2c04e0..4085df3259f 100644 --- a/pkg/delivery/BUILD.bazel +++ b/pkg/delivery/BUILD.bazel @@ -5,6 +5,7 @@ load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "doc.go", "receiver.go", "sender.go", ], @@ -23,11 +24,12 @@ go_library( ) go_test( - name = "go_default_test", + name = "go_default_xtest", srcs = [ "receiver_test.go", "sender_test.go", ], + importpath = "github.com/elafros/eventing/pkg/delivery_test", deps = [ ":go_default_library", "//pkg/apis/bind/v1alpha1:go_default_library", diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel index 5864071b545..0cde53d03de 100644 --- a/pkg/delivery/action/BUILD.bazel +++ b/pkg/delivery/action/BUILD.bazel @@ -1,6 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel index 1336348d5e6..df8567bec89 100644 --- a/pkg/delivery/queue/BUILD.bazel +++ b/pkg/delivery/queue/BUILD.bazel @@ -1,6 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", @@ -13,7 +13,5 @@ go_library( deps = [ "//pkg/event:go_default_library", "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/event/BUILD.bazel b/pkg/event/BUILD.bazel index cbfddebb2a0..2848ed19047 100644 --- a/pkg/event/BUILD.bazel +++ b/pkg/event/BUILD.bazel @@ -8,7 +8,8 @@ go_library( ) go_test( - name = "go_default_test", + name = "go_default_xtest", srcs = ["event_test.go"], + importpath = "github.com/elafros/eventing/pkg/event_test", deps = [":go_default_library"], - ) +) diff --git a/pkg/sources/BUILD b/pkg/sources/BUILD index 76e3a29f124..02922d4c903 100644 --- a/pkg/sources/BUILD +++ b/pkg/sources/BUILD @@ -3,9 +3,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = [ - "github.go", "event_source.go", "event_trigger.go", + "github.go", ], importpath = "github.com/elafros/eventing/pkg/sources", visibility = ["//visibility:public"], From d940e90821a3e5dc7440d72788eb7ea9876ccbe0 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 23 Apr 2018 16:33:55 -0700 Subject: [PATCH 5/6] Whoops. Forgot doc.gos --- pkg/delivery/action/doc.go | 19 +++++++++++++++++++ pkg/delivery/doc.go | 22 ++++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 pkg/delivery/action/doc.go create mode 100644 pkg/delivery/doc.go diff --git a/pkg/delivery/action/doc.go b/pkg/delivery/action/doc.go new file mode 100644 index 00000000000..cc1b59caf42 --- /dev/null +++ b/pkg/delivery/action/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package action implements the various Actions that can be invoked +// in response to an Event. +package action diff --git a/pkg/delivery/doc.go b/pkg/delivery/doc.go new file mode 100644 index 00000000000..65f5ad64b5e --- /dev/null +++ b/pkg/delivery/doc.go @@ -0,0 +1,22 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package delivery implements an event delivery service. The Receiver +// interface exposes a WebHook where compatible Events can be sent. +// The Receiver enqueues the Event and then acknowledges the webhook. +// A Sender polls the events Queue and delivers the event to the +// appropriate Action according to the Bind that matched the Event. +package delivery From 4387a135f779dfb3c1bf2c76dbbcda1d285c2ab9 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Wed, 25 Apr 2018 09:19:03 -0700 Subject: [PATCH 6/6] PR feedback; change interface in Sender --- pkg/delivery/queue/queue.go | 3 +++ pkg/delivery/sender.go | 32 ++++++++++++++++++++++---------- pkg/delivery/sender_test.go | 4 ++-- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go index 07495e2e6c2..286bd5b7826 100644 --- a/pkg/delivery/queue/queue.go +++ b/pkg/delivery/queue/queue.go @@ -21,6 +21,9 @@ import ( ) // QueuedEvent is the element saved to an EventQueue before delivery +// TODO(inlined): This should probably be made package protected; +// It may be reasonable to just return a tuple of these values from +// the Queue pull operations instead of defining the struct. type QueuedEvent struct { Action ActionType `json:"action"` Data interface{} `json:"data"` diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go index 77bfdb9f30b..1487767b22c 100644 --- a/pkg/delivery/sender.go +++ b/pkg/delivery/sender.go @@ -18,6 +18,7 @@ package delivery import ( "fmt" + "io" "time" "github.com/golang/glog" @@ -46,38 +47,49 @@ func NewSender( } // RunOnce processes a single event from the queue. +// Errors are unactionable and reported to the runtime rather than returned. +// Returns true if more processing is possible. // TODO: If the Queue was redesigned so that events are pulled with a lease, then we could restructure RunOnce to not // need the stopCh (though does it really matter now anyway if the in-memory queue goes away?) -func (s *Sender) RunOnce(stopCh <-chan struct{}) bool { +func (s *Sender) RunOnce(stopCh <-chan struct{}) error { // TODO(inlined): Pull should only take a lease on the queued event. The receiver should need // to acknowledge the event and possibly transactionally enqueue subsequent events. event, ok := s.eventQueue.Pull(stopCh) if !ok { - glog.V(4).Info("RunOnce shutting down") - return false + return io.EOF } glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) action, ok := s.actions[event.Action.Processor] if !ok { - runtime.HandleError(fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor)) - return true + return fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor) } // TODO(inlined): retry strategies for errors and continuations for success. res, err := action.SendEvent(event.Action.Name, event.Data, event.Context) if err != nil { - runtime.HandleError(fmt.Errorf("Action for event %s returned error: %s", event.Context.EventID, err)) - } else { - glog.V(4).Info("Event %s handled with response %+v", event.Context.EventID, res) + return fmt.Errorf("Action for event %s returned error: %s", event.Context.EventID, err) + } + glog.V(4).Info("Event %s handled with response %+v", event.Context.EventID, res) + return nil +} + +func (s *Sender) run(stopCh <-chan struct{}) { + for { + err := s.RunOnce(stopCh) + if err != nil { + if err == io.EOF { + return + } + runtime.HandleError(err) + } } - return true } // Run runs Sender until stopCh is closed. func (s *Sender) Run(threadiness int, stopCh <-chan struct{}) error { for i := 0; i < threadiness; i++ { - go wait.Until(func() { s.RunOnce(stopCh) }, time.Second, stopCh) + go wait.Until(func() { s.run(stopCh) }, time.Second, stopCh) } glog.Info("Started workers") diff --git a/pkg/delivery/sender_test.go b/pkg/delivery/sender_test.go index f0ad361b8ee..73b9505bcff 100644 --- a/pkg/delivery/sender_test.go +++ b/pkg/delivery/sender_test.go @@ -69,8 +69,8 @@ func TestSendEvent(t *testing.T) { }, }) - if ok := sender.RunOnce(nil); !ok { - t.Fatal("expected RunOnce() to return true") + if err := sender.RunOnce(nil); err != nil { + t.Fatalf("RunOnce() failed with err: %s", err) } if !actionInvoked { t.Fatal("Did not invoke action")