diff --git a/.gitignore b/.gitignore index ac51a054d2d..7f5afbb95a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ bazel-* +.idea +.vscode diff --git a/BUILD b/BUILD index 2f429daadff..8dfa98d9d0e 100644 --- a/BUILD +++ b/BUILD @@ -17,6 +17,14 @@ k8s_object( template = "controller.yaml", ) +k8s_object( + name = "delivery", + images = { + "event-delivery:latest": "//cmd/delivery:image", + }, + template = "delivery.yaml", +) + k8s_object( name = "namespace", template = "namespace.yaml", @@ -66,5 +74,6 @@ k8s_objects( ":eventtype", ":eventsource", ":controller", + ":delivery", ], ) diff --git a/cmd/delivery/BUILD.bazel b/cmd/delivery/BUILD.bazel new file mode 100644 index 00000000000..a8510e80ce9 --- /dev/null +++ b/cmd/delivery/BUILD.bazel @@ -0,0 +1,38 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importpath = "github.com/elafros/eventing/cmd/delivery", + visibility = ["//visibility:private"], + deps = [ + "//pkg/client/clientset/versioned:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", + "//pkg/delivery:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/signals:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/plugin/pkg/client/auth/gcp:go_default_library", + "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", + ], +) + +go_binary( + name = "delivery", + embed = [":go_default_library"], + importpath = "github.com/elafros/eventing/cmd/delivery", + pure = "on", +) + +load("@io_bazel_rules_docker//go:image.bzl", "go_image") + +go_image( + name = "image", + binary = ":delivery", +) diff --git a/cmd/delivery/main.go b/cmd/delivery/main.go new file mode 100644 index 00000000000..810103e5f48 --- /dev/null +++ b/cmd/delivery/main.go @@ -0,0 +1,140 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Implements a process that wraps an event-receiving webhook, an event +// queue, and a sender which delivers events to their eventual destination. +package main + +import ( + "context" + "flag" + "net/http" + "time" + + "github.com/golang/glog" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + clientset "github.com/elafros/eventing/pkg/client/clientset/versioned" + informers "github.com/elafros/eventing/pkg/client/informers/externalversions" + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/signals" + + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + eventBufferSize = 100 + senderThreads = 10 + serverAddress = ":9090" + metricsScrapePath = "/metrics" + debugQueuePath = "/queue/" + sendEventPrefix = "/v1alpha1/" +) + +var ( + masterURL string + kubeconfig string +) + +func main() { + defer glog.Flush() + flag.Parse() + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + if err != nil { + glog.Fatalf("Error building kubeconfig: %s", err.Error()) + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + + client, err := clientset.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building clientset: %s", err.Error()) + } + + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) + informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) + bindLister := informerFactory.Eventing().V1alpha1().Binds().Lister() + + q := queue.NewInMemoryQueue(eventBufferSize) + receiver := delivery.NewReceiver(bindLister, q) + + processors := map[string]action.Action{ + action.LoggingActionType: action.NewLoggingAction(), + action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient), + action.ServiceActionType: action.NewServiceAction(http.DefaultClient), + } + sender := delivery.NewSender(q, processors) + + go kubeInformerFactory.Start(stopCh) + go informerFactory.Start(stopCh) + + // Start sender: + go func() { + // We don't expect this to return until stop is called, + // but if it does, propagate it back. + glog.Info("Staring event sender") + if err := sender.Run(senderThreads, stopCh); err != nil { + glog.Fatalf("Error running controller: %s", err.Error()) + } + }() + + // Set up HTTP endpoints + mux := http.NewServeMux() + glog.Infof("Set up metrics scrape path %s", metricsScrapePath) + mux.Handle(metricsScrapePath, promhttp.Handler()) + glog.Infof("Set up debug queue path %s", debugQueuePath) + mux.Handle(debugQueuePath, queue.NewDiagnosticHandler(q)) + glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) + mux.Handle(sendEventPrefix, receiver) + mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + glog.Infof("Unhandled %s to route %q", req.Method, req.URL.Path) + http.NotFound(w, req) + }) + + // Start the API server + srv := &http.Server{Addr: serverAddress, Handler: mux} + go func() { + glog.Infof("Starting API server at %s", serverAddress) + if err := srv.ListenAndServe(); err != nil { + glog.Infof("Httpserver: ListenAndServe() finished with error: %s", err) + } + }() + + <-stopCh + + // Close the http server gracefully + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + srv.Shutdown(ctx) +} + +func init() { + flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") +} diff --git a/cmd/sendevent/main.go b/cmd/sendevent/main.go new file mode 100644 index 00000000000..0ca29cbb8de --- /dev/null +++ b/cmd/sendevent/main.go @@ -0,0 +1,104 @@ +/* +Copyright 2018 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Implements a simple utility for sending a JSON-encoded sample event. +// Not wired to bazel because this utility is meant to be run directly rather +// than deployed to K8S +package main + +import ( + "crypto/rand" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "time" + + "github.com/elafros/eventing/pkg/event" +) + +var ( + context event.Context + webhook string + data string +) + +func init() { + flag.StringVar(&context.EventID, "event-id", "", "Event ID to use. Defaults to a UUID") + flag.StringVar(&context.EventType, "event-type", "google.events.action.demo", "The Event Type to use.") + flag.StringVar(&context.Source, "source", "", "Source URI to use. Defaults to the current machine's hostname") + flag.StringVar(&data, "data", `{"hello": "world!"}`, "Event data") +} + +func main() { + flag.Parse() + + if flag.NArg() != 1 { + fmt.Println("Missing webhook address") + os.Exit(1) + } + + webhook := flag.Args()[0] + + var untyped map[string]interface{} + if err := json.Unmarshal([]byte(data), &untyped); err != nil { + fmt.Println("Currenlty sendevent only supports JSON event data") + os.Exit(1) + } + + fillEventContext(&context) + req, err := event.NewRequest(webhook, untyped, context) + if err != nil { + fmt.Printf("Failed to create request: %s", err) + os.Exit(1) + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Printf("Failed to send event to %s: %s\n", webhook, err) + os.Exit(1) + } + fmt.Printf("Got response from %s\n%s\n", webhook, res.Status) + if res.Header.Get("Content-Length") != "" { + bytes, _ := ioutil.ReadAll(res.Body) + fmt.Println(string(bytes)) + } +} + +func fillEventContext(ctx *event.Context) { + ctx.CloudEventsVersion = "0.1" + ctx.EventTime = time.Now().UTC() + + if ctx.EventID == "" { + // A "UUID". Not technically spec complaint + b := make([]byte, 16) + if n, err := rand.Read(b); n != 16 || err != nil { + fmt.Println("Could not create event-id UUID") + os.Exit(1) + } + ctx.EventID = fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) + } + + if ctx.Source == "" { + var err error + ctx.Source, err = os.Hostname() + if err != nil { + ctx.Source = "localhost" + } + } +} diff --git a/controller.yaml b/controller.yaml index 82020a8b654..a81b092990e 100644 --- a/controller.yaml +++ b/controller.yaml @@ -23,7 +23,6 @@ spec: labels: app: bind-controller spec: - serviceAccountName: bind-controller containers: - name: bind-controller image: bind-controller:latest diff --git a/delivery.yaml b/delivery.yaml new file mode 100644 index 00000000000..6003ecd9a3f --- /dev/null +++ b/delivery.yaml @@ -0,0 +1,34 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: event-delivery + namespace: bind-system +spec: + replicas: 1 + template: + metadata: + labels: + app: event-delivery + spec: + # TODO(inlined): create new narrower role + serviceAccountName: bind-controller + containers: + - name: event-delivery + image: event-delivery:latest + args: [ + "-logtostderr", + "-stderrthreshold", "INFO", + ] diff --git a/pkg/apis/bind/v1alpha1/bind_types.go b/pkg/apis/bind/v1alpha1/bind_types.go index 9b9899f5e65..704837e89ab 100644 --- a/pkg/apis/bind/v1alpha1/bind_types.go +++ b/pkg/apis/bind/v1alpha1/bind_types.go @@ -35,11 +35,18 @@ type Bind struct { Status BindStatus `json:"status"` } +// BindAction describes where events should be delivered. type BindAction struct { - // RouteName specifies Elafros route as a target. - RouteName string `json:"routeName,omitempty"` + // Processor dictates the kind of service that will handle the Event. + // For example "elafros.dev/Route" + Processor string `json:"processor"` + + // Name dicates the resource exposed by Processor that will handle the event. + // The semantics of Name is determined by Processor. + Name string `json:"name"` } +// EventTrigger describes when an Event should be delivered. type EventTrigger struct { // Required. The type of event to observe. For example: // `google.storage.object.finalize` and @@ -118,13 +125,22 @@ type EventTrigger struct { ParametersFrom []ParametersFromSource `json:"parametersFrom,omitempty"` } +// Zero determines whether an EventTrigger is fully empty. If a Bind is set up +// without an EventTrigger, it is assumed that the real event source is incompatible +// with our event framework's control plane and will be set up manually through +// side channels. +func (t *EventTrigger) Zero() bool { + return t.EventType == "" && t.Resource == "" && t.Service == "" && + t.Parameters == nil && t.ParametersFrom == nil +} + // BindSpec is the spec for a Bind resource type BindSpec struct { // Action specifies the target handler for the events Action BindAction `json:"action"` // Trigger specifies the trigger we're binding to - Trigger EventTrigger `json:trigger"` + Trigger EventTrigger `json:"trigger"` } // ParametersFromSource represents the source of a set of Parameters diff --git a/pkg/controller/bind/BUILD b/pkg/controller/bind/BUILD index a128b5dba39..b2b51a1d341 100644 --- a/pkg/controller/bind/BUILD +++ b/pkg/controller/bind/BUILD @@ -12,6 +12,7 @@ go_library( "//pkg/client/informers/externalversions:go_default_library", "//pkg/client/listers/bind/v1alpha1:go_default_library", "//pkg/controller:go_default_library", + "//pkg/delivery/action:go_default_library", "//pkg/sources:go_default_library", "//vendor/github.com/elafros/elafros/pkg/client/informers/externalversions:go_default_library", "//vendor/github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1:go_default_library", diff --git a/pkg/controller/bind/controller.go b/pkg/controller/bind/controller.go index 50aa1ce223e..d4c72dfd7de 100644 --- a/pkg/controller/bind/controller.go +++ b/pkg/controller/bind/controller.go @@ -19,6 +19,7 @@ package bind import ( "encoding/json" "fmt" + "strings" "time" "github.com/ghodss/yaml" @@ -48,6 +49,7 @@ import ( bindscheme "github.com/elafros/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/elafros/eventing/pkg/client/informers/externalversions" listers "github.com/elafros/eventing/pkg/client/listers/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/action" ) const controllerAgentName = "bind-controller" @@ -133,7 +135,7 @@ func NewController( } controller.sources = make(map[string]sources.EventSource) - controller.sources["github.com"] = sources.NewGithubEventSource() + controller.sources["github.com"] = sources.NewGithubEventSource(kubeclientset) glog.Info("Setting up event handlers") // Set up an event handler for when Bind resources change bindInformer.Binds().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -288,76 +290,40 @@ func (c *Controller) syncHandler(key string) error { bind = bind.DeepCopy() // Find the Route that they want. - routeName := bind.Spec.Action.RouteName - route, err := c.routesLister.Routes(namespace).Get(routeName) - if err != nil { + actionName := bind.Spec.Action.Name + actionType := bind.Spec.Action.Processor + if err := c.validateAction(namespace, actionName, actionType); err != nil { if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Route %q in namespace %q does not exist", routeName, namespace)) + runtime.HandleError(fmt.Errorf("Action %q of type %q in namespace %q does not exist", actionName, actionType, namespace)) } + glog.Warningf("Bind %q rejected for invalid action: %s", name, err) return err } - domainSuffix, err := getDomainSuffixFromElaConfig(c.kubeclientset) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Can't find ela ConfigMap")) + var condition *v1alpha1.BindCondition + if bind.Spec.Trigger.Zero() { + glog.Warningf("Bind %q has no Spec.Trigger", name) + if bind.Status.Conditions != nil { + condition = &v1alpha1.BindCondition{ + Type: v1alpha1.BindComplete, + Status: corev1.ConditionTrue, + Reason: fmt.Sprintf("BindSuccess; no Trigger to manage"), + Message: "Bind successful", + } } - return err - } - - functionDNS := fmt.Sprintf("%s.%s.%s", route.Name, route.Namespace, domainSuffix) - glog.Infof("Found route DNS as '%q'", functionDNS) - - es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } else { + if condition, err = c.bindTrigger(namespace, bind); err != nil { + return err } - return err } - et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) - } - return err - } - - trigger, err := resolveTrigger(c.kubeclientset, namespace, bind.Spec.Trigger) - if err != nil { - glog.Warningf("Failed to process parameters: %s", err) - return err - } - - if bind.Status.Conditions != nil { + if condition == nil { glog.Infof("Already has status, skipping") return nil } - glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger) - if val, ok := c.sources[es.Name]; ok { - r, err := val.Bind(trigger, functionDNS) - if err != nil { - glog.Warningf("BIND failed: %s", err) - msg := fmt.Sprintf("Bind failed with : %s", r) - bind.Status.SetCondition(&v1alpha1.BindCondition{ - Type: v1alpha1.BindFailed, - Status: corev1.ConditionTrue, - Reason: "BindFailed", - Message: msg, - }) - } else { - bind.Status.SetCondition(&v1alpha1.BindCondition{ - Type: v1alpha1.BindComplete, - Status: corev1.ConditionTrue, - Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)), - Message: "Bind successful", - }) - } - } - _, err = c.updateStatus(bind) - if err != nil { + bind.Status.SetCondition(condition) + if _, err = c.updateStatus(bind); err != nil { glog.Warningf("Failed to update status: %s", err) return err } @@ -381,19 +347,69 @@ func (c *Controller) updateStatus(u *v1alpha1.Bind) (*v1alpha1.Bind, error) { return bindClient.Update(newu) } -func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (sources.EventTrigger, error) { - r := sources.EventTrigger{ - Resource: trigger.Resource, - EventType: trigger.EventType, - Parameters: make(map[string]interface{}), +func (c *Controller) bindTrigger(namespace string, bind *v1alpha1.Bind) (*v1alpha1.BindCondition, error) { + es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) + if err != nil { + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } + return nil, err } + + et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType) + if err != nil { + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } + return nil, err + } + + parameters, err := resolveParameters(c.kubeclientset, namespace, bind.Spec.Trigger) + if err != nil { + glog.Warningf("Failed to process parameters: %s", err) + return nil, err + } + + if bind.Status.Conditions != nil { + return nil, nil + } + + glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, bind.Spec.Trigger) + eventSource, ok := c.sources[es.Name] + if !ok { + return nil, fmt.Errorf("Do not know how to deploy EventTrigger to source with name %s", es.Name) + } + + r, err := eventSource.Bind(bind, parameters) + if err != nil { + glog.Warningf("BIND failed: %s", err) + msg := fmt.Sprintf("Bind failed with : %s", r) + return &v1alpha1.BindCondition{ + Type: v1alpha1.BindFailed, + Status: corev1.ConditionTrue, + Reason: "BindFailed", + Message: msg, + }, nil + } + return &v1alpha1.BindCondition{ + Type: v1alpha1.BindComplete, + Status: corev1.ConditionTrue, + // BUG(43) ID is a feature of the GitHub event source. If it is to become standardized, + // we should not be using a map[string]interface{} result. + Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)), + Message: "Bind successful", + }, nil +} + +func resolveParameters(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (map[string]interface{}, error) { + r := make(map[string]interface{}) if trigger.Parameters != nil && trigger.Parameters.Raw != nil && len(trigger.Parameters.Raw) > 0 { p := make(map[string]interface{}) if err := yaml.Unmarshal(trigger.Parameters.Raw, &p); err != nil { return r, err } for k, v := range p { - r.Parameters[k] = v + r[k] = v } } if trigger.ParametersFrom != nil { @@ -404,7 +420,7 @@ func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v return r, err } for k, v := range pfs { - r.Parameters[k] = v + r[k] = v } } } @@ -446,16 +462,20 @@ func unmarshalJSON(in []byte) (map[string]interface{}, error) { return parameters, nil } -func getDomainSuffixFromElaConfig(cl kubernetes.Interface) (string, error) { - const name = "ela-config" - const namespace = "ela-system" - c, err := cl.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return "", err - } - domainSuffix, ok := c.Data["domainSuffix"] - if !ok { - return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) +func (c *Controller) validateAction(namespace string, name string, actionType string) error { + switch actionType { + case action.ElafrosActionType: + parts := strings.Split(name, "/") + if len(parts) == 2 { + namespace, name = parts[0], parts[1] + } + _, err := c.routesLister.Routes(namespace).Get(name) + return err + case action.LoggingActionType: + return nil + default: + // Should this be an errors.NewNotFound? How does the action struct + // fit into the schema.GroupResource idea? + return fmt.Errorf("Unknown Action.Processor %q", actionType) } - return domainSuffix, nil } diff --git a/pkg/delivery/BUILD.bazel b/pkg/delivery/BUILD.bazel new file mode 100644 index 00000000000..4085df3259f --- /dev/null +++ b/pkg/delivery/BUILD.bazel @@ -0,0 +1,43 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "gazelle", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "receiver.go", + "sender.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery", + deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/client/listers/bind/v1alpha1:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +go_test( + name = "go_default_xtest", + srcs = [ + "receiver_test.go", + "sender_test.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery_test", + deps = [ + ":go_default_library", + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/client/clientset/versioned/fake:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", + "//pkg/delivery/action:go_default_library", + "//pkg/delivery/queue:go_default_library", + "//pkg/event:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel new file mode 100644 index 00000000000..5daf4ccf451 --- /dev/null +++ b/pkg/delivery/action/BUILD.bazel @@ -0,0 +1,21 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "action.go", + "doc.go", + "elafros.go", + "logging.go", + "service.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery/action", + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) diff --git a/pkg/delivery/action/action.go b/pkg/delivery/action/action.go new file mode 100644 index 00000000000..f4bce8172ae --- /dev/null +++ b/pkg/delivery/action/action.go @@ -0,0 +1,43 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "github.com/elafros/eventing/pkg/event" +) + +// The Action interface implements the capability to send events to an action of +// a particular category (e.g. Elafros routes, WebHooks, ETLs) +// Random thought: optional additional interface for SendEventAsync. We've wanted +// this in the past to allow infrastructure optimizations. +type Action interface { + // SendEvent delivers an event to the named Action synchronously. + // Should return the response from the action (for possible continuation) + // or an error. + // Returns an interface{} result which is currently unused, but will be used to + // allow continuations in the future. + // TODO: May need interfaces to indicate whether errors are fatal or retryable. + SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) +} + +// ActionFunc allows simple Action types to be implemented as a standalone function. +type ActionFunc func(name string, data interface{}, context *event.Context) (interface{}, error) + +// SendEvent implements the Action interface for a function with the same signature as Action.SendEvent. +func (a ActionFunc) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + return a(name, data, context) +} diff --git a/pkg/delivery/action/doc.go b/pkg/delivery/action/doc.go new file mode 100644 index 00000000000..cc1b59caf42 --- /dev/null +++ b/pkg/delivery/action/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package action implements the various Actions that can be invoked +// in response to an Event. +package action diff --git a/pkg/delivery/action/elafros.go b/pkg/delivery/action/elafros.go new file mode 100644 index 00000000000..73fb799752b --- /dev/null +++ b/pkg/delivery/action/elafros.go @@ -0,0 +1,85 @@ +package action + +import ( + "fmt" + "net/http" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" + "k8s.io/client-go/kubernetes" +) + +const ( + // ElafrosActionType is the expected Bind Processor name to + // cause Events to be sent to an Elafros Route. + ElafrosActionType = "elafros.dev/Route" +) + +// elafrosAction sends Events to an Elafros Route. +type elafrosAction struct { + kubeclientset kubernetes.Interface + httpclient *http.Client +} + +// NewElafrosAction creates an Action object that sends Events to Elafros Routes. +func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) Action { + return &elafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} +} + +// SendEvent will POST the event spec to the root URI of the elafros route. +func (a *elafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + glog.Infof("Sending event %s to ELA route %s", context.EventID, name) + var namespace, route string + parts := strings.Split(name, "/") + if len(parts) != 2 { + // TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace + // of the Bind resource. Should ensure that we'll always get routes in this form. + namespace, route = "default", name + } else { + namespace, route = parts[0], parts[1] + } + + domain, err := getDomainSuffixFromElaConfig(a.kubeclientset) + if err != nil { + glog.Error("Could not look up Elafros domain") + return nil, err + } + + addr := fmt.Sprintf("http://%s.%s.%s/", route, namespace, domain) + glog.Info("Sending event", context.EventID, "to ELA route at", addr) + req, err := event.NewRequest(addr, data, *context) + if err != nil { + return nil, err + } + + res, err := a.httpclient.Do(req) + if err != nil { + glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err) + return nil, err + } + // TODO: Standard handling of non-200 responses as errors. + if res.StatusCode/100 != 2 { + glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr) + } + + // TODO: Return response data so that it may be forwarded in chained Binds. + glog.Infof("Sent event to %s; got response %s", addr, res.Status) + return nil, nil +} + +func getDomainSuffixFromElaConfig(cl kubernetes.Interface) (string, error) { + const name = "ela-config" + const namespace = "ela-system" + c, err := cl.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + domainSuffix, ok := c.Data["domainSuffix"] + if !ok { + return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) + } + return domainSuffix, nil +} diff --git a/pkg/delivery/action/logging.go b/pkg/delivery/action/logging.go new file mode 100644 index 00000000000..298c53ae631 --- /dev/null +++ b/pkg/delivery/action/logging.go @@ -0,0 +1,52 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "encoding/json" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + // LoggingActionType is the expected Bind Processor type + // that will cause events to be logged and dropped. + LoggingActionType = "eventing.elafros.dev/EventLogger" +) + +// A LoggingAction will log and drop Events. +type loggingAction struct{} + +// SendEvent implements Action.SendEvent +func (a loggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + event := map[string]interface{}{ + "data": data, + "context": context, + } + b, err := json.Marshal(event) + if err != nil { + return nil, err + } + glog.Infof("[%s] sendEvent %s", name, string(b)) + return nil, nil +} + +// NewLoggingAction creates an Action that will log and drop Events. +func NewLoggingAction() Action { + return loggingAction{} +} diff --git a/pkg/delivery/action/service.go b/pkg/delivery/action/service.go new file mode 100644 index 00000000000..6278d0e1ba2 --- /dev/null +++ b/pkg/delivery/action/service.go @@ -0,0 +1,62 @@ +package action + +import ( + "fmt" + "net/http" + "strings" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + // ServiceActionType is the expected Bind Processor name to + // cause Events to be sent to a K8S service. + ServiceActionType = "Service" +) + +// serviceAction sends Events to a K8S service. +type serviceAction struct { + httpclient *http.Client +} + +// NewServiceAction creates an Action object that sends Events to K8S services. +func NewServiceAction(httpclient *http.Client) Action { + return &serviceAction{httpclient: httpclient} +} + +// SendEvent will POST the event spec to the root URI of the elafros route. +func (a *serviceAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + glog.Infof("Sending event %s to K8S service %s", context.EventID, name) + var namespace, service string + parts := strings.Split(name, "/") + if len(parts) != 2 { + // TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace + // of the Bind resource. Should ensure that we'll always get routes in this form. + namespace, service = "default", name + } else { + namespace, service = parts[0], parts[1] + } + + addr := fmt.Sprintf("http://%s.%s.svc.cluster.local", service, namespace) + glog.Info("Sending event", context.EventID, "to K8S service at", addr) + req, err := event.NewRequest(addr, data, *context) + if err != nil { + glog.Errorf("Failed to marshal event: %s", err) + return nil, err + } + + res, err := a.httpclient.Do(req) + if err != nil { + glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err) + return nil, err + } + // TODO: Standard handling of non-200 responses as errors. + if res.StatusCode/100 != 2 { + glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr) + } + + // TODO: Return response data so that it may be forwarded in chained Binds. + glog.Infof("Sent event to %s; got response %s", addr, res.Status) + return nil, nil +} diff --git a/pkg/delivery/doc.go b/pkg/delivery/doc.go new file mode 100644 index 00000000000..65f5ad64b5e --- /dev/null +++ b/pkg/delivery/doc.go @@ -0,0 +1,22 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package delivery implements an event delivery service. The Receiver +// interface exposes a WebHook where compatible Events can be sent. +// The Receiver enqueues the Event and then acknowledges the webhook. +// A Sender polls the events Queue and delivers the event to the +// appropriate Action according to the Bind that matched the Event. +package delivery diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel new file mode 100644 index 00000000000..a47d3cf440b --- /dev/null +++ b/pkg/delivery/queue/BUILD.bazel @@ -0,0 +1,18 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "diagnostics.go", + "memory_queue.go", + "queue.go", + ], + importpath = "github.com/elafros/eventing/pkg/delivery/queue", + deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) diff --git a/pkg/delivery/queue/diagnostics.go b/pkg/delivery/queue/diagnostics.go new file mode 100644 index 00000000000..e4f87dd03e2 --- /dev/null +++ b/pkg/delivery/queue/diagnostics.go @@ -0,0 +1,59 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "encoding/json" + "net/http" + + "github.com/golang/glog" +) + +type diagnosticsHandler struct { + Queue + *http.ServeMux +} + +// NewDiagnosticHandler provides endpoints for inspecting +// and diagnosing queued events. +// Eventually we'll need to decide more about how we will shard our queues +// and what visibility we'll give to each. E.g. I suspect we'll support looking +// at the length of queues for a single Flow or Action. +func NewDiagnosticHandler(q Queue) http.Handler { + h := &diagnosticsHandler{Queue: q, ServeMux: http.NewServeMux()} + h.HandleFunc("/queue/length", h.GetLength) + h.HandleFunc("/queue/", func(w http.ResponseWriter, r *http.Request) { + glog.Infof("Do not know how to handle queue path", r.URL.Path) + http.NotFound(w, r) + }) + return h +} + +func (h *diagnosticsHandler) GetLength(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + glog.Warningf("Unauthorized method on /queue/length") + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + glog.V(4).Info("Reporting queue length") + b, err := json.Marshal(map[string]int{"length": h.Length()}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + w.Write(b) +} diff --git a/pkg/delivery/queue/memory_queue.go b/pkg/delivery/queue/memory_queue.go new file mode 100644 index 00000000000..819f45442a0 --- /dev/null +++ b/pkg/delivery/queue/memory_queue.go @@ -0,0 +1,60 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import "errors" + +// InMemoryQueue implements the queue interface with a memory buffer. +// Note: this isn't just a simple typedef for a queue because we will soon start +// experimenting with other features, such as fetching an event separate from acking, +// transactional ack + enqueue, cursors, etc. +type inMemoryQueue struct { + ch chan QueuedEvent +} + +// NewInMemoryQueue creates a new InMemoryQueue +func NewInMemoryQueue(length int) Queue { + ch := make(chan QueuedEvent, length) + return &inMemoryQueue{ + ch: ch, + } +} + +// Push implements Queue.Push +func (q *inMemoryQueue) Push(event QueuedEvent) error { + select { + case q.ch <- event: + return nil + default: + return errors.New("Event queue is out of memory") + } +} + +// Pull implements Queue.Pull +func (q *inMemoryQueue) Pull(stopCh <-chan struct{}) (QueuedEvent, bool) { + select { + case event, ok := <-q.ch: + return event, ok + case <-stopCh: + return QueuedEvent{}, false + } +} + +// Length returns Queue.Length +func (q inMemoryQueue) Length() int { + return len(q.ch) +} diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go new file mode 100644 index 00000000000..e91e3bb72c8 --- /dev/null +++ b/pkg/delivery/queue/queue.go @@ -0,0 +1,39 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/event" +) + +// QueuedEvent is the element saved to an EventQueue before delivery +// TODO(inlined): This should probably be made package protected; +// It may be reasonable to just return a tuple of these values from +// the Queue pull operations instead of defining the struct. +type QueuedEvent struct { + Action v1alpha1.BindAction `json:"action"` + Data interface{} `json:"data"` + Context *event.Context `json:"context"` +} + +// Queue implements basic features to allow asynchronous buffering of events. +type Queue interface { + Push(event QueuedEvent) error + Pull(stopCh <-chan struct{}) (event QueuedEvent, ok bool) + Length() int +} diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go new file mode 100644 index 00000000000..07defca33e6 --- /dev/null +++ b/pkg/delivery/receiver.go @@ -0,0 +1,104 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery + +import ( + "net/http" + "regexp" + + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + + listers "github.com/elafros/eventing/pkg/client/listers/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" +) + +// Currently the delivery service requires that Events are sent directly to an endpoint named +// after a Bind which the Event matches. Future versions may offer a "firehose" endpoint that +// will do late-time filtering of Events. +var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) + +// Receiver manages the HTTP endpoints for receiving events as well as +// stats about the queue for processing events. +type Receiver struct { + eventQueue queue.Queue + bindsLister listers.BindLister +} + +// NewReceiver creates a new Receiver object to enqueue events. +func NewReceiver(bindsLister listers.BindLister, eventQueue queue.Queue) *Receiver { + return &Receiver{bindsLister: bindsLister, eventQueue: eventQueue} +} + +// SendEvent enqueues an event data and Context for delivery to a particular action. +func (r *Receiver) SendEvent(action v1alpha1.BindAction, data interface{}, context *event.Context) error { + return r.eventQueue.Push(queue.QueuedEvent{ + Action: action, + Data: data, + Context: context, + }) +} + +// ServeHTTP implements the external REST API that other services will use to send events. +func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { + glog.Infof("Serving %s", req.URL.Path) + if req.Method != http.MethodPost { + glog.V(3).Info("Cannot handle method", req.Method) + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + matches := directSendEventRegExp.FindStringSubmatch(req.URL.Path) + if len(matches) != 3 { + glog.V(3).Info("Cannot parse route", req.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + + namespace, flowName := string(matches[1]), string(matches[2]) + bind, err := r.bindsLister.Binds(namespace).Get(flowName) + if err != nil { + if errors.IsNotFound(err) { + glog.V(3).Infof("Event sent to non-existant Bind %s in namespace %s", flowName, namespace) + w.WriteHeader(http.StatusNotFound) + return + } + glog.Errorf("Unknown error fetching bind %s in namespace %s: %s", flowName, namespace, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + var data interface{} + context, err := event.FromRequest(&data, req) + if err != nil { + glog.Error("Failed to unmarshal event ", err) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + if err := r.SendEvent(bind.Spec.Action, data, context); err != nil { + glog.Error("Failed to enqueue event", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + glog.Infof("Enqueued event %s for delivery to %s action %s", + context.EventID, bind.Spec.Action.Processor, bind.Spec.Action.Name) + w.WriteHeader(http.StatusOK) +} diff --git a/pkg/delivery/receiver_test.go b/pkg/delivery/receiver_test.go new file mode 100644 index 00000000000..c22977a20ec --- /dev/null +++ b/pkg/delivery/receiver_test.go @@ -0,0 +1,170 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery_test + +import ( + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/client/clientset/versioned/fake" + informers "github.com/elafros/eventing/pkg/client/informers/externalversions" + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" +) + +var ( + demoBind = &v1alpha1.Bind{ + TypeMeta: metav1.TypeMeta{ + Kind: "Bind", + APIVersion: "eventing.elafros.dev/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "demo", + Namespace: "default", + }, + Spec: v1alpha1.BindSpec{ + Trigger: v1alpha1.EventTrigger{ + EventType: "org.example.object.create", + Service: "example.org", + Resource: "{abc}/123", + }, + Action: v1alpha1.BindAction{ + Processor: "eventing.elafros.dev/WebHook", + Name: "https://demo.google.com/notreal", + }, + }, + } +) + +func TestReceiverHttpErrors(t *testing.T) { + tests := []struct { + description string + method string + path string + headers http.Header + body interface{} + expectedStatus int + }{ + { + description: "bad method", + method: http.MethodGet, + expectedStatus: http.StatusMethodNotAllowed, + }, + { + description: "bad path", + method: http.MethodPost, + path: "/abc/123", + expectedStatus: http.StatusNotFound, + }, + { + description: "bad event", + method: http.MethodPost, + path: "/v1alpha1/namespaces/default/flows/demo:sendEvent", + // no headers, so we're missing all default fields + expectedStatus: http.StatusBadRequest, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + client := fake.NewSimpleClientset(demoBind) + factory := informers.NewSharedInformerFactory(client, 0).Eventing().V1alpha1().Binds() + factory.Informer().GetIndexer().Add(demoBind) + lister := factory.Lister() + + q := queue.NewInMemoryQueue(1) + r := delivery.NewReceiver(lister, q) + s := httptest.NewServer(r) + defer s.Close() + + url, err := url.Parse(s.URL + test.path) + if err != nil { + t.Fatalf("Unexpected error crafting test URL: %s", err) + } + res, err := s.Client().Do(&http.Request{ + Method: test.method, + URL: url, + }) + if err != nil { + t.Fatalf("Failed to reach test server: %s", err) + } + if res.StatusCode != test.expectedStatus { + t.Fatalf("Wrong status; expected=%d got=%d", test.expectedStatus, res.StatusCode) + } + }) + } +} + +func TestEnqueueEvent(t *testing.T) { + client := fake.NewSimpleClientset(demoBind) + factory := informers.NewSharedInformerFactory(client, 0).Eventing().V1alpha1().Binds() + factory.Informer().GetIndexer().Add(demoBind) + lister := factory.Lister() + + q := queue.NewInMemoryQueue(1) + r := delivery.NewReceiver(lister, q) + s := httptest.NewServer(r) + defer s.Close() + + data := map[string]interface{}{"hello": "world"} + context := &event.Context{ + CloudEventsVersion: "0.0.1-rc1", + EventID: "123abc", + EventTime: time.Now().UTC(), + EventType: "org.example.object.create", + Source: "https://example.org/examples/example1", + } + + path := s.URL + "/v1alpha1/namespaces/default/flows/demo:sendEvent" + req, err := event.NewRequest(path, data, *context) + if err != nil { + t.Fatalf("Failed to create request: %s", err) + } + + res, err := s.Client().Do(req) + if err != nil { + t.Fatalf("Unexpected error calling sendEvent: %s", err) + } + if res.StatusCode != http.StatusOK { + t.Fatalf("SendEvent request %+v failed with response %+v", req, res) + } + + if q.Length() != 1 { + t.Fatalf("Expected one queued event; got %d", q.Length()) + } + event, ok := q.Pull(nil) + if !ok { + t.Fatal("Failed to pull queued event") + } + if !reflect.DeepEqual(context, event.Context) { + t.Fatalf("Event context was not marshalled correctly; expected=%+v got=%+v", context, event.Context) + } + if !reflect.DeepEqual(data, event.Data) { + t.Fatalf("Event data was not marshalled correctly; expected=%+v got=%+v", data, event.Data) + } + if !reflect.DeepEqual(demoBind.Spec.Action, event.Action) { + t.Fatalf("Event action was not marshalled correctly; expected=%+v got=%+v", demoBind.Spec.Action, event.Action) + } +} diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go new file mode 100644 index 00000000000..14dd1e66c4a --- /dev/null +++ b/pkg/delivery/sender.go @@ -0,0 +1,106 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery + +import ( + "fmt" + "io" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" +) + +// Sender delivers queued events to their target Actions. +type Sender struct { + eventQueue queue.Queue + actions map[string]action.Action +} + +// NewSender creates a new Sender object to deliver Events which were +// enqueued by a Receiver. +func NewSender( + eventQueue queue.Queue, + actions map[string]action.Action) *Sender { + return &Sender{ + eventQueue: eventQueue, + actions: actions, + } +} + +// RunOnce processes a single event from the queue. +// Errors are unactionable and reported to the runtime rather than returned. +// Returns true if more processing is possible. +// TODO: If the Queue was redesigned so that events are pulled with a lease, then we could restructure RunOnce to not +// need the stopCh (though does it really matter now anyway if the in-memory queue goes away?) +func (s *Sender) RunOnce(stopCh <-chan struct{}) error { + // TODO(inlined): Pull should only take a lease on the queued event. The receiver should need + // to acknowledge the event and possibly transactionally enqueue subsequent events. + event, ok := s.eventQueue.Pull(stopCh) + if !ok { + return io.EOF + } + + glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) + action, ok := s.actions[event.Action.Processor] + if !ok { + keys := make([]string, 0, len(s.actions)) + for key := range s.actions { + keys = append(keys, key) + } + return fmt.Errorf("Event %s is routed to unknown Processor '%s'; known processors are %q", + event.Context.EventID, event.Action.Processor, keys) + } + + // TODO(inlined): retry strategies for errors and continuations for success. + res, err := action.SendEvent(event.Action.Name, event.Data, event.Context) + if err != nil { + return fmt.Errorf("Action for event %s returned error: %s", event.Context.EventID, err) + } + glog.V(4).Info("Event %s handled with response %+v", event.Context.EventID, res) + return nil +} + +func (s *Sender) run(stopCh <-chan struct{}) { + for { + err := s.RunOnce(stopCh) + if err != nil { + if err == io.EOF { + return + } + glog.Warning(err) + runtime.HandleError(err) + } + } +} + +// Run runs Sender until stopCh is closed. +func (s *Sender) Run(threadiness int, stopCh <-chan struct{}) error { + for i := 0; i < threadiness; i++ { + go wait.Until(func() { s.run(stopCh) }, time.Second, stopCh) + } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + + return nil +} diff --git a/pkg/delivery/sender_test.go b/pkg/delivery/sender_test.go new file mode 100644 index 00000000000..91980386318 --- /dev/null +++ b/pkg/delivery/sender_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2018 Google, Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery_test + +import ( + "reflect" + "sync" + "testing" + + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery" + "github.com/elafros/eventing/pkg/delivery/action" + "github.com/elafros/eventing/pkg/delivery/queue" + "github.com/elafros/eventing/pkg/event" +) + +func TestSendEvent(t *testing.T) { + actionInvoked := false + expectedData := map[string]interface{}{ + "hello": "world", + } + expectedContext := &event.Context{ + EventID: "123", + } + actionName := "projects/demo/region/us-central1/function/test" + actionType := "functions.googleapis.com" + callback := func(name string, data interface{}, context *event.Context) (interface{}, error) { + if actionInvoked { + t.Fatal("Action invoked twice") + } + actionInvoked = true + if name != actionName { + t.Fatalf("Action invoked with wrong name. expected=%s got=%s", actionName, name) + } + if !reflect.DeepEqual(expectedData, data) { + t.Fatalf("Action invoked with wrong data. expected=%v got=%v", expectedData, data) + } + if !reflect.DeepEqual(expectedContext, context) { + t.Fatalf("Action invoked with wrong context. expected=%v got=%v", expectedContext, context) + } + + return nil, nil + } + + q := queue.NewInMemoryQueue(1) + sender := delivery.NewSender(q, map[string]action.Action{ + actionType: action.ActionFunc(callback), + }) + + q.Push(queue.QueuedEvent{ + Context: expectedContext, + Data: expectedData, + Action: v1alpha1.BindAction{ + Processor: actionType, + Name: actionName, + }, + }) + + if err := sender.RunOnce(nil); err != nil { + t.Fatalf("RunOnce() failed with err: %s", err) + } + if !actionInvoked { + t.Fatal("Did not invoke action") + } +} + +func TestShutdown(t *testing.T) { + q := queue.NewInMemoryQueue(0) + sender := delivery.NewSender(q, nil) + stopCh := make(chan struct{}) + var wait sync.WaitGroup + done := false + wait.Add(1) + go func() { + sender.Run(20, stopCh) + done = true + wait.Done() + }() + if done { + t.Fatalf("Sender stops automatically") + } + close(stopCh) + wait.Wait() + if !done { + t.Fatalf("Should be done") + } +} diff --git a/pkg/event/BUILD.bazel b/pkg/event/BUILD.bazel index cbfddebb2a0..2848ed19047 100644 --- a/pkg/event/BUILD.bazel +++ b/pkg/event/BUILD.bazel @@ -8,7 +8,8 @@ go_library( ) go_test( - name = "go_default_test", + name = "go_default_xtest", srcs = ["event_test.go"], + importpath = "github.com/elafros/eventing/pkg/event_test", deps = [":go_default_library"], - ) +) diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index b6b7e79127b..9ce07cbf9ce 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -27,7 +27,7 @@ import ( var ( context = &event.Context{ CloudEventsVersion: "0.0.1-preview-1", - EventId: "eventid-123", + EventID: "eventid-123", EventType: "google.firestore.document.create", EventTime: time.Now().UTC(), Source: "//firestore.googleapis.com/projects/demo/databases/default/documents/users/inlined", diff --git a/pkg/sources/BUILD b/pkg/sources/BUILD index 76e3a29f124..bf00542ba60 100644 --- a/pkg/sources/BUILD +++ b/pkg/sources/BUILD @@ -3,15 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = [ - "github.go", "event_source.go", - "event_trigger.go", + "github.go", ], importpath = "github.com/elafros/eventing/pkg/sources", visibility = ["//visibility:public"], deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/delivery/action:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/google/go-github/github:go_default_library", "//vendor/golang.org/x/oauth2:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/sources/event_source.go b/pkg/sources/event_source.go index c807653c54a..b661a6fd8f4 100644 --- a/pkg/sources/event_source.go +++ b/pkg/sources/event_source.go @@ -16,6 +16,10 @@ limitations under the License. package sources +import "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + +// EventSource manages the subscription of which events a developer is interested in. type EventSource interface { - Bind(trigger EventTrigger, route string) (map[string]interface{}, error) + // Bind creates a new subscription of events. + Bind(bind *v1alpha1.Bind, parameters map[string]interface{}) (map[string]interface{}, error) } diff --git a/pkg/sources/event_trigger.go b/pkg/sources/event_trigger.go deleted file mode 100644 index a36ff723607..00000000000 --- a/pkg/sources/event_trigger.go +++ /dev/null @@ -1,17 +0,0 @@ -package sources - -// EventTrigger is a version of v1alpha1.EventTrigger where -// parameters have been resolved. -type EventTrigger struct { - // EventType is the type of event to be observed - EventType string `json:"eventType"` - - // Resource is the resource(s) from which to observe events. - Resource string `json:"resource"` - - // Service is the hostname of the service that should be observed. - Service string `json:"service"` - - // Parameters is an opaque map of configuration needed by the EventSource. - Parameters map[string]interface{} `json:"params"` -} diff --git a/pkg/sources/github.go b/pkg/sources/github.go index a6b100a0d1c..6431f217215 100644 --- a/pkg/sources/github.go +++ b/pkg/sources/github.go @@ -18,14 +18,18 @@ package sources import ( "context" + "errors" "fmt" "strings" "github.com/golang/glog" - + ghclient "github.com/google/go-github/github" "golang.org/x/oauth2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" - ghclient "github.com/google/go-github/github" + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/action" ) const ( @@ -37,39 +41,52 @@ const ( MessageResourceSynced = "Bind synced successfully" ) +// GithubEventSource configures Event delivery from GitHub. type GithubEventSource struct { + clientset kubernetes.Interface } -func NewGithubEventSource() EventSource { - return &GithubEventSource{} +// NewGithubEventSource configures a new GithubEventSource. +func NewGithubEventSource(clientset kubernetes.Interface) EventSource { + return &GithubEventSource{clientset: clientset} } -func (t *GithubEventSource) Bind(trigger EventTrigger, route string) (map[string]interface{}, error) { - glog.Infof("CREATING GITHUB WEBHOOK") +// Bind implements EventSource.Bind +func (s *GithubEventSource) Bind(bind *v1alpha1.Bind, parameters map[string]interface{}) (map[string]interface{}, error) { + glog.Infof("BINDING GITHUB EVENT") + // BUG(#43) The GitHub event source should be responsible for its own gateway rathan relying + // on elafros routes to be externally visible. + if bind.Spec.Action.Processor != action.ElafrosActionType { + return nil, errors.New("[elafros/eventing#43] GitHub event source currently only supports Elafros Routes as their action type") + } + address, err := s.getFunctionAddress(bind) + if err != nil { + return nil, err + } ctx := context.Background() ts := oauth2.StaticTokenSource( - &oauth2.Token{AccessToken: trigger.Parameters["accessToken"].(string)}, + &oauth2.Token{AccessToken: parameters["accessToken"].(string)}, ) tc := oauth2.NewClient(ctx, ts) - glog.Infof("CREATING GITHUB WEBHOOK with access token: %s", trigger.Parameters["accessToken"].(string)) + glog.Infof("CREATING GITHUB WEBHOOK with access token: %s", parameters["accessToken"].(string)) client := ghclient.NewClient(tc) active := true name := "web" config := make(map[string]interface{}) - config["url"] = fmt.Sprintf("http://%s", route) + config["url"] = fmt.Sprintf("http://%s", address) config["content_type"] = "json" - config["secret"] = trigger.Parameters["secretToken"].(string) + config["secret"] = parameters["secretToken"].(string) hook := ghclient.Hook{ Name: &name, - URL: &route, + URL: &address, Events: []string{"pull_request"}, Active: &active, Config: config, } - components := strings.Split(trigger.Resource, "/") + components := strings.Split(bind.Spec.Trigger.Resource, "/") h, r, err := client.Repositories.CreateHook(ctx, components[0] /* owner */, components[1] /* repo */, &hook) if err != nil { glog.Warningf("Failed to create the webhook: %s", err) @@ -81,3 +98,32 @@ func (t *GithubEventSource) Bind(trigger EventTrigger, route string) (map[string ret["ID"] = fmt.Sprintf("%d", h.ID) return ret, nil } + +func (s *GithubEventSource) getFunctionAddress(bind *v1alpha1.Bind) (string, error) { + suffix, err := s.getDomainSuffix() + if err != nil { + return "", err + } + + functionDNS := fmt.Sprintf( + "%s.%s.%s", + /* function name */ bind.Spec.Action.Name, + /* function namespace */ bind.ObjectMeta.Namespace, + /* cluster root domain */ suffix) + glog.Infof("Found route DNS as '%q'", functionDNS) + return functionDNS, nil +} + +func (s *GithubEventSource) getDomainSuffix() (string, error) { + const name = "ela-config" + const namespace = "ela-system" + c, err := s.clientset.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + domainSuffix, ok := c.Data["domainSuffix"] + if !ok { + return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) + } + return domainSuffix, nil +} diff --git a/sample/actions/BUILD.bazel b/sample/actions/BUILD.bazel new file mode 100644 index 00000000000..71fc7cbb3c1 --- /dev/null +++ b/sample/actions/BUILD.bazel @@ -0,0 +1,82 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["action.go"], + importpath = "github.com/elafros/eventing/sample/actions", + visibility = ["//visibility:private"], + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) + +go_binary( + name = "action-demo", + embed = [":go_default_library"], + importpath = "github.com/elafros/elafros/sample/actions", + pure = "on", + visibility = ["//visibility:public"], +) + +load("@io_bazel_rules_docker//go:image.bzl", "go_image") + +go_image( + name = "image", + binary = ":action-demo", +) + +load("@k8s_object//:defaults.bzl", "k8s_object") + +# Step 1: Support for action as a debug statement +k8s_object( + name = "bind", + template = ":bind.yaml", +) + +# Step 2: Support for action as an ELA route +# To enable: update the action-sample Bind to have the following +# Spec.Action: +# Name: action-demo +# Processor: elafros.dev/Route + +k8s_object( + name = "configuration", + images = { + "action-demo:latest": ":image", + }, + template = ":configuration.yaml", +) + +k8s_object( + name = "route", + template = ":route.yaml", +) + +# Step 3: Support for action as Service + +k8s_object( + name = "deployment", + images = { + "action-demo:latest": ":image", + }, + template = ":deployment.yaml", +) + +k8s_object( + name = "service", + template = ":service.yaml", +) + +load("@io_bazel_rules_k8s//k8s:objects.bzl", "k8s_objects") + +k8s_objects( + name = "everything", + objects = [ + ":bind", + ":route", + ":configuration", + ":deployment", + ":service", + ], +) diff --git a/sample/actions/README.md b/sample/actions/README.md new file mode 100644 index 00000000000..237a258072f --- /dev/null +++ b/sample/actions/README.md @@ -0,0 +1,136 @@ +# Actions + +Demo to show support for dynamic actions, disjoint from the event source's configuration. + +## Prerequisites + +1. [Setup your development environment](../../DEVELOPMENT.md#getting-started) +2. [Start Elafros](../../README.md#start-elafros) +3. Decide on the DNS name that git can then call. Update elafros/elafros/elaconfig.yaml domainSuffix. +For example I used ela.inlined.me as my hostname, so my elaconfig.yaml looks like so: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: ela-config + namespace: ela-system +data: + domainSuffix: ela.inlined.me +``` + +If you were already running the elafros controllers, you will need to kill the ela-controller in the ela-system namespace for it to pick up the new domain suffix. + +4. Since we will be mimicing an internal service, we need access to the event-delivery service. + Expose it with: + + ``` + kubectl expose deployment event-delivery --port=80 --target-port=9090 --type=LoadBalancer --namespace bind-system + ``` + +5. Install the `sendevent` utility + +```bash +go install github.com/elafros/eventing/cmd/sendevent +``` + +6. Save the address of your exposed `event-delivery` service. + +```bash +export EVENT_DELIVERY_IP=$(kubectl get services/event-delivery --namespace bind-system -o go-template='{{ (index .status.loadBalancer.ingress 0).ip }}') +``` + +## Demo 1: Install Bind with a debug-logging action + +### Deploying assets + +You can deploy this to Elafros from the root directory via: + +```shell +bazel run sample/actions:everything.apply +``` + +This created a handful of resources. The one most immediately useful is the `action-demo` Bind. +An event source would normally deliver events to this Bind by posting to + +``` +${EVENT_DELIVERY_SERVICE}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Sending events + +We'll do the same with the `sendevent` utility: + +```bash +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +(You can optionally configure the sent event by choosing any of `--event-id`, `--event-type`, +`--source`, or `--data`) + +### Observing side-effects + +The event processor deployed in configuration.yaml is "eventing.elafros.dev/EventLogger". This +simply emits a structured event to the logs for the "event-delivery" app. View them with: + +``` +kubectl logs --namespace bind-system -lapp=event-delivery +``` + +## Demo 2: Reroute Bind to an Elafros action. + +### Deploying assets + +In the previous step, you already deployed the elafros route (also named 'action-demo') +that we'll use in this step. To make the demo more exciting, edit "action.go" and change +the `databaseURL` constant to a Firebase Realtime Database URL which you are an owner of. + +To make the `action-demo` Bind point to your new action, edit `bind.yaml` and change the spec.action.processor to `elafros.dev/Route` + +Apply these changes with `blaze run sample/actions:everything.apply` + +### Sending events + +Because we did not change anything about the (mock) event source, the same command will +now have a different effect. + +```bash +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Observing side-effects + +If you load your Firebase Realtime Database, you will see new event-ids appended to the "seenEvents" node every time you run `sendevent` + +## Cleaning up + +To clean up the sample service: + +```shell +bazel run sample/github:everything.delete +``` + +## Demo 3: Reroute Bind to a K8S Service + +### Deploying assets + +This step will reuse the same Docker image as step 2, but will host `action.go` in +a Deployment + Service. To verify that we are targeting a new backend, the Deployment +sets an environment variable to change where in the Firebase Realtime Database events +are published. + +To mkae the `action-demo` bind point to the Service vrsion of our demo, edit `bind.yaml` and change the spec.action.processor to `Service`. + +Apply these changes with `bazel run sample/actions:everything.apply` + +### Sending events + +Again, the same command will now have new effects: + +``` +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Observing side-effects + +If you load your Firebase Realtime Database, you will see new event-ids appended to the "seenEventsInService" node every time you run `sendevent` diff --git a/sample/actions/action.go b/sample/actions/action.go new file mode 100644 index 00000000000..8fb8a7de522 --- /dev/null +++ b/sample/actions/action.go @@ -0,0 +1,82 @@ +/* +Copyright 2018 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + databaseURL = "https://inlined-junkdrawer.firebaseio.com" + address = ":8080" +) + +// SendEventToDB... +func SendEventToDB(w http.ResponseWriter, r *http.Request) { + glog.Info("Sending event to RTDB") + var data map[string]interface{} + context, err := event.FromRequest(&data, r) + if err != nil { + glog.Errorf("Failed to parse event: %s", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + addr := fmt.Sprintf("%s/seenEvents%s/%s/.json", databaseURL, os.Getenv("KEY_SUFFIX"), context.EventID) + url, err := url.Parse(addr) + if err != nil { + glog.Errorf("Failed to parse url %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + // Can not fail + body, _ := json.Marshal(data) + + write := &http.Request{ + Method: http.MethodPut, + URL: url, + Body: ioutil.NopCloser(bytes.NewReader(body)), + } + res, err := http.DefaultClient.Do(write) + if err != nil { + glog.Errorf("Failed to write to RTDB: %s", err) + } + if res.StatusCode/100 != 2 { + glog.Errorf("Got non-success response from RTDB: %d", res.StatusCode) + } + glog.Info("Success!") + + w.WriteHeader(http.StatusOK) +} + +func main() { + http.HandleFunc("/healthz", func(w http.ResponseWriter, req *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.Write([]byte(`{"status": "ok"}`)) + }) + http.HandleFunc("/", SendEventToDB) + http.ListenAndServe(address, nil) +} diff --git a/sample/actions/bind.yaml b/sample/actions/bind.yaml new file mode 100644 index 00000000000..9934c35bf00 --- /dev/null +++ b/sample/actions/bind.yaml @@ -0,0 +1,11 @@ +apiVersion: eventing.elafros.dev/v1alpha1 +kind: Bind +metadata: + name: action-demo +spec: + trigger: + # Note: trigger is not used right now. + # Demos must currently publish directly to the endpoint for this Bind + action: + name: action-demo + processor: eventing.elafros.dev/EventLogger \ No newline at end of file diff --git a/sample/actions/configuration.yaml b/sample/actions/configuration.yaml new file mode 100644 index 00000000000..b34ce3726fd --- /dev/null +++ b/sample/actions/configuration.yaml @@ -0,0 +1,13 @@ +apiVersion: elafros.dev/v1alpha1 +kind: Configuration +metadata: + name: action-demo + namespace: default +spec: + revisionTemplate: + metadata: + labels: + elafros.dev/type: container + spec: + container: + image: action-demo:latest \ No newline at end of file diff --git a/sample/actions/deployment.yaml b/sample/actions/deployment.yaml new file mode 100644 index 00000000000..a00a964c839 --- /dev/null +++ b/sample/actions/deployment.yaml @@ -0,0 +1,33 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: action-demo +spec: + replicas: 1 + template: + metadata: + labels: + app: action-demo-service + spec: + containers: + - name: action-demo + image: action-demo:latest + ports: + - containerPort: 8080 + env: + - name: KEY_SUFFIX + value: "InService" \ No newline at end of file diff --git a/sample/actions/route.yaml b/sample/actions/route.yaml new file mode 100644 index 00000000000..d7bc21ea9e2 --- /dev/null +++ b/sample/actions/route.yaml @@ -0,0 +1,23 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: elafros.dev/v1alpha1 +kind: Route +metadata: + name: action-demo + namespace: default +spec: + traffic: + - configurationName: action-demo + percent: 100 diff --git a/sample/actions/service.yaml b/sample/actions/service.yaml new file mode 100644 index 00000000000..c0e14d0100f --- /dev/null +++ b/sample/actions/service.yaml @@ -0,0 +1,26 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + name: action-demo +spec: + selector: + app: action-demo-service + ports: + - protocol: TCP + port: 80 + targetPort: 8080 + diff --git a/sample/github/pullrequest.yaml b/sample/github/pullrequest.yaml index e5cb476b702..0caa4f230aa 100644 --- a/sample/github/pullrequest.yaml +++ b/sample/github/pullrequest.yaml @@ -30,4 +30,5 @@ spec: name: githubsecret key: githubCredentials action: - routeName: git-webhook + processor: elafros.dev/Route + name: git-webhook