From 179bbf34c700ec8f751ec4eb49082b80ec7e6309 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Wed, 25 Apr 2018 14:05:56 -0700 Subject: [PATCH 1/2] Make Actions dynamc and add sample. Steps: 1. Changed Bind.Spec.Action to match planned API 2. Reran codegen 3. Update code until existing GitHub demo worked 4. Factor out parts of the Bind controller so that we can cleanly handle Binds without triggers (will help with KubeCon demo) 5. Added dynamic action support 6. Added Service action 7. Added sample --- cmd/delivery/main.go | 1 + pkg/apis/bind/v1alpha1/bind_types.go | 21 +++- pkg/controller/bind/BUILD | 1 + pkg/controller/bind/controller.go | 172 +++++++++++++++------------ pkg/delivery/action/BUILD.bazel | 1 + pkg/delivery/action/elafros.go | 33 +++-- pkg/delivery/action/logging.go | 8 +- pkg/delivery/action/service.go | 62 ++++++++++ pkg/delivery/queue/BUILD.bazel | 1 + pkg/delivery/queue/queue.go | 15 +-- pkg/delivery/receiver.go | 20 ++-- pkg/delivery/receiver_test.go | 6 +- pkg/delivery/sender.go | 8 +- pkg/delivery/sender_test.go | 3 +- pkg/sources/BUILD | 5 +- pkg/sources/event_source.go | 6 +- pkg/sources/github.go | 70 +++++++++-- sample/actions/BUILD.bazel | 82 +++++++++++++ sample/actions/README.md | 136 +++++++++++++++++++++ sample/actions/action.go | 82 +++++++++++++ sample/actions/bind.yaml | 11 ++ sample/actions/configuration.yaml | 13 ++ sample/actions/deployment.yaml | 33 +++++ sample/actions/route.yaml | 23 ++++ sample/actions/service.yaml | 26 ++++ sample/github/pullrequest.yaml | 3 +- 26 files changed, 707 insertions(+), 135 deletions(-) create mode 100644 pkg/delivery/action/service.go create mode 100644 sample/actions/BUILD.bazel create mode 100644 sample/actions/README.md create mode 100644 sample/actions/action.go create mode 100644 sample/actions/bind.yaml create mode 100644 sample/actions/configuration.yaml create mode 100644 sample/actions/deployment.yaml create mode 100644 sample/actions/route.yaml create mode 100644 sample/actions/service.yaml diff --git a/cmd/delivery/main.go b/cmd/delivery/main.go index 877cf8f7d91..810103e5f48 100644 --- a/cmd/delivery/main.go +++ b/cmd/delivery/main.go @@ -87,6 +87,7 @@ func main() { processors := map[string]action.Action{ action.LoggingActionType: action.NewLoggingAction(), action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient), + action.ServiceActionType: action.NewServiceAction(http.DefaultClient), } sender := delivery.NewSender(q, processors) diff --git a/pkg/apis/bind/v1alpha1/bind_types.go b/pkg/apis/bind/v1alpha1/bind_types.go index 6267e02d954..704837e89ab 100644 --- a/pkg/apis/bind/v1alpha1/bind_types.go +++ b/pkg/apis/bind/v1alpha1/bind_types.go @@ -35,9 +35,15 @@ type Bind struct { Status BindStatus `json:"status"` } +// BindAction describes where events should be delivered. type BindAction struct { - // RouteName specifies Elafros route as a target. - RouteName string `json:"routeName,omitempty"` + // Processor dictates the kind of service that will handle the Event. + // For example "elafros.dev/Route" + Processor string `json:"processor"` + + // Name dicates the resource exposed by Processor that will handle the event. + // The semantics of Name is determined by Processor. + Name string `json:"name"` } // EventTrigger describes when an Event should be delivered. @@ -119,13 +125,22 @@ type EventTrigger struct { ParametersFrom []ParametersFromSource `json:"parametersFrom,omitempty"` } +// Zero determines whether an EventTrigger is fully empty. If a Bind is set up +// without an EventTrigger, it is assumed that the real event source is incompatible +// with our event framework's control plane and will be set up manually through +// side channels. +func (t *EventTrigger) Zero() bool { + return t.EventType == "" && t.Resource == "" && t.Service == "" && + t.Parameters == nil && t.ParametersFrom == nil +} + // BindSpec is the spec for a Bind resource type BindSpec struct { // Action specifies the target handler for the events Action BindAction `json:"action"` // Trigger specifies the trigger we're binding to - Trigger EventTrigger `json:trigger"` + Trigger EventTrigger `json:"trigger"` } // ParametersFromSource represents the source of a set of Parameters diff --git a/pkg/controller/bind/BUILD b/pkg/controller/bind/BUILD index a128b5dba39..b2b51a1d341 100644 --- a/pkg/controller/bind/BUILD +++ b/pkg/controller/bind/BUILD @@ -12,6 +12,7 @@ go_library( "//pkg/client/informers/externalversions:go_default_library", "//pkg/client/listers/bind/v1alpha1:go_default_library", "//pkg/controller:go_default_library", + "//pkg/delivery/action:go_default_library", "//pkg/sources:go_default_library", "//vendor/github.com/elafros/elafros/pkg/client/informers/externalversions:go_default_library", "//vendor/github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1:go_default_library", diff --git a/pkg/controller/bind/controller.go b/pkg/controller/bind/controller.go index 50aa1ce223e..d4c72dfd7de 100644 --- a/pkg/controller/bind/controller.go +++ b/pkg/controller/bind/controller.go @@ -19,6 +19,7 @@ package bind import ( "encoding/json" "fmt" + "strings" "time" "github.com/ghodss/yaml" @@ -48,6 +49,7 @@ import ( bindscheme "github.com/elafros/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/elafros/eventing/pkg/client/informers/externalversions" listers "github.com/elafros/eventing/pkg/client/listers/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/action" ) const controllerAgentName = "bind-controller" @@ -133,7 +135,7 @@ func NewController( } controller.sources = make(map[string]sources.EventSource) - controller.sources["github.com"] = sources.NewGithubEventSource() + controller.sources["github.com"] = sources.NewGithubEventSource(kubeclientset) glog.Info("Setting up event handlers") // Set up an event handler for when Bind resources change bindInformer.Binds().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -288,76 +290,40 @@ func (c *Controller) syncHandler(key string) error { bind = bind.DeepCopy() // Find the Route that they want. - routeName := bind.Spec.Action.RouteName - route, err := c.routesLister.Routes(namespace).Get(routeName) - if err != nil { + actionName := bind.Spec.Action.Name + actionType := bind.Spec.Action.Processor + if err := c.validateAction(namespace, actionName, actionType); err != nil { if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Route %q in namespace %q does not exist", routeName, namespace)) + runtime.HandleError(fmt.Errorf("Action %q of type %q in namespace %q does not exist", actionName, actionType, namespace)) } + glog.Warningf("Bind %q rejected for invalid action: %s", name, err) return err } - domainSuffix, err := getDomainSuffixFromElaConfig(c.kubeclientset) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Can't find ela ConfigMap")) + var condition *v1alpha1.BindCondition + if bind.Spec.Trigger.Zero() { + glog.Warningf("Bind %q has no Spec.Trigger", name) + if bind.Status.Conditions != nil { + condition = &v1alpha1.BindCondition{ + Type: v1alpha1.BindComplete, + Status: corev1.ConditionTrue, + Reason: fmt.Sprintf("BindSuccess; no Trigger to manage"), + Message: "Bind successful", + } } - return err - } - - functionDNS := fmt.Sprintf("%s.%s.%s", route.Name, route.Namespace, domainSuffix) - glog.Infof("Found route DNS as '%q'", functionDNS) - - es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } else { + if condition, err = c.bindTrigger(namespace, bind); err != nil { + return err } - return err } - et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) - } - return err - } - - trigger, err := resolveTrigger(c.kubeclientset, namespace, bind.Spec.Trigger) - if err != nil { - glog.Warningf("Failed to process parameters: %s", err) - return err - } - - if bind.Status.Conditions != nil { + if condition == nil { glog.Infof("Already has status, skipping") return nil } - glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger) - if val, ok := c.sources[es.Name]; ok { - r, err := val.Bind(trigger, functionDNS) - if err != nil { - glog.Warningf("BIND failed: %s", err) - msg := fmt.Sprintf("Bind failed with : %s", r) - bind.Status.SetCondition(&v1alpha1.BindCondition{ - Type: v1alpha1.BindFailed, - Status: corev1.ConditionTrue, - Reason: "BindFailed", - Message: msg, - }) - } else { - bind.Status.SetCondition(&v1alpha1.BindCondition{ - Type: v1alpha1.BindComplete, - Status: corev1.ConditionTrue, - Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)), - Message: "Bind successful", - }) - } - } - _, err = c.updateStatus(bind) - if err != nil { + bind.Status.SetCondition(condition) + if _, err = c.updateStatus(bind); err != nil { glog.Warningf("Failed to update status: %s", err) return err } @@ -381,19 +347,69 @@ func (c *Controller) updateStatus(u *v1alpha1.Bind) (*v1alpha1.Bind, error) { return bindClient.Update(newu) } -func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (sources.EventTrigger, error) { - r := sources.EventTrigger{ - Resource: trigger.Resource, - EventType: trigger.EventType, - Parameters: make(map[string]interface{}), +func (c *Controller) bindTrigger(namespace string, bind *v1alpha1.Bind) (*v1alpha1.BindCondition, error) { + es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) + if err != nil { + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } + return nil, err } + + et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType) + if err != nil { + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } + return nil, err + } + + parameters, err := resolveParameters(c.kubeclientset, namespace, bind.Spec.Trigger) + if err != nil { + glog.Warningf("Failed to process parameters: %s", err) + return nil, err + } + + if bind.Status.Conditions != nil { + return nil, nil + } + + glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, bind.Spec.Trigger) + eventSource, ok := c.sources[es.Name] + if !ok { + return nil, fmt.Errorf("Do not know how to deploy EventTrigger to source with name %s", es.Name) + } + + r, err := eventSource.Bind(bind, parameters) + if err != nil { + glog.Warningf("BIND failed: %s", err) + msg := fmt.Sprintf("Bind failed with : %s", r) + return &v1alpha1.BindCondition{ + Type: v1alpha1.BindFailed, + Status: corev1.ConditionTrue, + Reason: "BindFailed", + Message: msg, + }, nil + } + return &v1alpha1.BindCondition{ + Type: v1alpha1.BindComplete, + Status: corev1.ConditionTrue, + // BUG(43) ID is a feature of the GitHub event source. If it is to become standardized, + // we should not be using a map[string]interface{} result. + Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)), + Message: "Bind successful", + }, nil +} + +func resolveParameters(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (map[string]interface{}, error) { + r := make(map[string]interface{}) if trigger.Parameters != nil && trigger.Parameters.Raw != nil && len(trigger.Parameters.Raw) > 0 { p := make(map[string]interface{}) if err := yaml.Unmarshal(trigger.Parameters.Raw, &p); err != nil { return r, err } for k, v := range p { - r.Parameters[k] = v + r[k] = v } } if trigger.ParametersFrom != nil { @@ -404,7 +420,7 @@ func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v return r, err } for k, v := range pfs { - r.Parameters[k] = v + r[k] = v } } } @@ -446,16 +462,20 @@ func unmarshalJSON(in []byte) (map[string]interface{}, error) { return parameters, nil } -func getDomainSuffixFromElaConfig(cl kubernetes.Interface) (string, error) { - const name = "ela-config" - const namespace = "ela-system" - c, err := cl.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return "", err - } - domainSuffix, ok := c.Data["domainSuffix"] - if !ok { - return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) +func (c *Controller) validateAction(namespace string, name string, actionType string) error { + switch actionType { + case action.ElafrosActionType: + parts := strings.Split(name, "/") + if len(parts) == 2 { + namespace, name = parts[0], parts[1] + } + _, err := c.routesLister.Routes(namespace).Get(name) + return err + case action.LoggingActionType: + return nil + default: + // Should this be an errors.NewNotFound? How does the action struct + // fit into the schema.GroupResource idea? + return fmt.Errorf("Unknown Action.Processor %q", actionType) } - return domainSuffix, nil } diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel index 0cde53d03de..5daf4ccf451 100644 --- a/pkg/delivery/action/BUILD.bazel +++ b/pkg/delivery/action/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "doc.go", "elafros.go", "logging.go", + "service.go", ], importpath = "github.com/elafros/eventing/pkg/delivery/action", deps = [ diff --git a/pkg/delivery/action/elafros.go b/pkg/delivery/action/elafros.go index 5224759f375..73fb799752b 100644 --- a/pkg/delivery/action/elafros.go +++ b/pkg/delivery/action/elafros.go @@ -18,24 +18,29 @@ const ( ElafrosActionType = "elafros.dev/Route" ) -// ElafrosAction sends Events to an Elafros Route. -type ElafrosAction struct { +// elafrosAction sends Events to an Elafros Route. +type elafrosAction struct { kubeclientset kubernetes.Interface httpclient *http.Client } // NewElafrosAction creates an Action object that sends Events to Elafros Routes. -func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) *ElafrosAction { - return &ElafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} +func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) Action { + return &elafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} } // SendEvent will POST the event spec to the root URI of the elafros route. -func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { +func (a *elafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + glog.Infof("Sending event %s to ELA route %s", context.EventID, name) + var namespace, route string parts := strings.Split(name, "/") if len(parts) != 2 { - return nil, fmt.Errorf("Expected elafros route '%s' to be in the form '/'", name) + // TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace + // of the Bind resource. Should ensure that we'll always get routes in this form. + namespace, route = "default", name + } else { + namespace, route = parts[0], parts[1] } - route, namespace := parts[1], parts[0] domain, err := getDomainSuffixFromElaConfig(a.kubeclientset) if err != nil { @@ -43,17 +48,25 @@ func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event. return nil, err } - addr := fmt.Sprintf("http://%s.%s.%s", route, namespace, domain) + addr := fmt.Sprintf("http://%s.%s.%s/", route, namespace, domain) + glog.Info("Sending event", context.EventID, "to ELA route at", addr) req, err := event.NewRequest(addr, data, *context) if err != nil { return nil, err } - if _, err := a.httpclient.Do(req); err != nil { + res, err := a.httpclient.Do(req) + if err != nil { + glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err) return nil, err } + // TODO: Standard handling of non-200 responses as errors. + if res.StatusCode/100 != 2 { + glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr) + } - // TODO: non-200 responses as errors. Standard decoding of responses to be forwarded. + // TODO: Return response data so that it may be forwarded in chained Binds. + glog.Infof("Sent event to %s; got response %s", addr, res.Status) return nil, nil } diff --git a/pkg/delivery/action/logging.go b/pkg/delivery/action/logging.go index 05f98c7c959..298c53ae631 100644 --- a/pkg/delivery/action/logging.go +++ b/pkg/delivery/action/logging.go @@ -30,10 +30,10 @@ const ( ) // A LoggingAction will log and drop Events. -type LoggingAction struct{} +type loggingAction struct{} // SendEvent implements Action.SendEvent -func (a LoggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { +func (a loggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { event := map[string]interface{}{ "data": data, "context": context, @@ -47,6 +47,6 @@ func (a LoggingAction) SendEvent(name string, data interface{}, context *event.C } // NewLoggingAction creates an Action that will log and drop Events. -func NewLoggingAction() LoggingAction { - return LoggingAction{} +func NewLoggingAction() Action { + return loggingAction{} } diff --git a/pkg/delivery/action/service.go b/pkg/delivery/action/service.go new file mode 100644 index 00000000000..6278d0e1ba2 --- /dev/null +++ b/pkg/delivery/action/service.go @@ -0,0 +1,62 @@ +package action + +import ( + "fmt" + "net/http" + "strings" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + // ServiceActionType is the expected Bind Processor name to + // cause Events to be sent to a K8S service. + ServiceActionType = "Service" +) + +// serviceAction sends Events to a K8S service. +type serviceAction struct { + httpclient *http.Client +} + +// NewServiceAction creates an Action object that sends Events to K8S services. +func NewServiceAction(httpclient *http.Client) Action { + return &serviceAction{httpclient: httpclient} +} + +// SendEvent will POST the event spec to the root URI of the elafros route. +func (a *serviceAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + glog.Infof("Sending event %s to K8S service %s", context.EventID, name) + var namespace, service string + parts := strings.Split(name, "/") + if len(parts) != 2 { + // TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace + // of the Bind resource. Should ensure that we'll always get routes in this form. + namespace, service = "default", name + } else { + namespace, service = parts[0], parts[1] + } + + addr := fmt.Sprintf("http://%s.%s.svc.cluster.local", service, namespace) + glog.Info("Sending event", context.EventID, "to K8S service at", addr) + req, err := event.NewRequest(addr, data, *context) + if err != nil { + glog.Errorf("Failed to marshal event: %s", err) + return nil, err + } + + res, err := a.httpclient.Do(req) + if err != nil { + glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err) + return nil, err + } + // TODO: Standard handling of non-200 responses as errors. + if res.StatusCode/100 != 2 { + glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr) + } + + // TODO: Return response data so that it may be forwarded in chained Binds. + glog.Infof("Sent event to %s; got response %s", addr, res.Status) + return nil, nil +} diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel index df8567bec89..a47d3cf440b 100644 --- a/pkg/delivery/queue/BUILD.bazel +++ b/pkg/delivery/queue/BUILD.bazel @@ -11,6 +11,7 @@ go_library( ], importpath = "github.com/elafros/eventing/pkg/delivery/queue", deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", "//pkg/event:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go index 286bd5b7826..e91e3bb72c8 100644 --- a/pkg/delivery/queue/queue.go +++ b/pkg/delivery/queue/queue.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" "github.com/elafros/eventing/pkg/event" ) @@ -25,17 +26,9 @@ import ( // It may be reasonable to just return a tuple of these values from // the Queue pull operations instead of defining the struct. type QueuedEvent struct { - Action ActionType `json:"action"` - Data interface{} `json:"data"` - Context *event.Context `json:"context"` -} - -// ActionType is a shim while Bind's Action is not generic. -// TODO(vaikas): Remove this once Bind's Action has been migrated -// to be generic. -type ActionType struct { - Name string `json:"name"` - Processor string `json:"processor"` + Action v1alpha1.BindAction `json:"action"` + Data interface{} `json:"data"` + Context *event.Context `json:"context"` } // Queue implements basic features to allow asynchronous buffering of events. diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go index 860189ea0d3..07defca33e6 100644 --- a/pkg/delivery/receiver.go +++ b/pkg/delivery/receiver.go @@ -34,17 +34,6 @@ import ( // will do late-time filtering of Events. var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) -// TODO(vaikas): Remove this once Bind's Action has been migrated -// to be generic. -const hardCodedProcessor = "eventing.elafros.dev/EventLogger" - -func actionFromBind(bind *v1alpha1.Bind) queue.ActionType { - return queue.ActionType{ - Name: bind.Spec.Action.RouteName, - Processor: hardCodedProcessor, - } -} - // Receiver manages the HTTP endpoints for receiving events as well as // stats about the queue for processing events. type Receiver struct { @@ -58,7 +47,7 @@ func NewReceiver(bindsLister listers.BindLister, eventQueue queue.Queue) *Receiv } // SendEvent enqueues an event data and Context for delivery to a particular action. -func (r *Receiver) SendEvent(action queue.ActionType, data interface{}, context *event.Context) error { +func (r *Receiver) SendEvent(action v1alpha1.BindAction, data interface{}, context *event.Context) error { return r.eventQueue.Push(queue.QueuedEvent{ Action: action, Data: data, @@ -68,6 +57,7 @@ func (r *Receiver) SendEvent(action queue.ActionType, data interface{}, context // ServeHTTP implements the external REST API that other services will use to send events. func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { + glog.Infof("Serving %s", req.URL.Path) if req.Method != http.MethodPost { glog.V(3).Info("Cannot handle method", req.Method) w.WriteHeader(http.StatusMethodNotAllowed) @@ -102,9 +92,13 @@ func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - if err := r.SendEvent(actionFromBind(bind), data, context); err != nil { + if err := r.SendEvent(bind.Spec.Action, data, context); err != nil { glog.Error("Failed to enqueue event", err) w.WriteHeader(http.StatusInternalServerError) return } + + glog.Infof("Enqueued event %s for delivery to %s action %s", + context.EventID, bind.Spec.Action.Processor, bind.Spec.Action.Name) + w.WriteHeader(http.StatusOK) } diff --git a/pkg/delivery/receiver_test.go b/pkg/delivery/receiver_test.go index c8280ca2f7b..c22977a20ec 100644 --- a/pkg/delivery/receiver_test.go +++ b/pkg/delivery/receiver_test.go @@ -51,7 +51,8 @@ var ( Resource: "{abc}/123", }, Action: v1alpha1.BindAction{ - RouteName: "vaikas.fix.this", + Processor: "eventing.elafros.dev/WebHook", + Name: "https://demo.google.com/notreal", }, }, } @@ -163,4 +164,7 @@ func TestEnqueueEvent(t *testing.T) { if !reflect.DeepEqual(data, event.Data) { t.Fatalf("Event data was not marshalled correctly; expected=%+v got=%+v", data, event.Data) } + if !reflect.DeepEqual(demoBind.Spec.Action, event.Action) { + t.Fatalf("Event action was not marshalled correctly; expected=%+v got=%+v", demoBind.Spec.Action, event.Action) + } } diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go index 1487767b22c..14dd1e66c4a 100644 --- a/pkg/delivery/sender.go +++ b/pkg/delivery/sender.go @@ -62,7 +62,12 @@ func (s *Sender) RunOnce(stopCh <-chan struct{}) error { glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) action, ok := s.actions[event.Action.Processor] if !ok { - return fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor) + keys := make([]string, 0, len(s.actions)) + for key := range s.actions { + keys = append(keys, key) + } + return fmt.Errorf("Event %s is routed to unknown Processor '%s'; known processors are %q", + event.Context.EventID, event.Action.Processor, keys) } // TODO(inlined): retry strategies for errors and continuations for success. @@ -81,6 +86,7 @@ func (s *Sender) run(stopCh <-chan struct{}) { if err == io.EOF { return } + glog.Warning(err) runtime.HandleError(err) } } diff --git a/pkg/delivery/sender_test.go b/pkg/delivery/sender_test.go index 73b9505bcff..91980386318 100644 --- a/pkg/delivery/sender_test.go +++ b/pkg/delivery/sender_test.go @@ -21,6 +21,7 @@ import ( "sync" "testing" + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" "github.com/elafros/eventing/pkg/delivery" "github.com/elafros/eventing/pkg/delivery/action" "github.com/elafros/eventing/pkg/delivery/queue" @@ -63,7 +64,7 @@ func TestSendEvent(t *testing.T) { q.Push(queue.QueuedEvent{ Context: expectedContext, Data: expectedData, - Action: queue.ActionType{ + Action: v1alpha1.BindAction{ Processor: actionType, Name: actionName, }, diff --git a/pkg/sources/BUILD b/pkg/sources/BUILD index 02922d4c903..bf00542ba60 100644 --- a/pkg/sources/BUILD +++ b/pkg/sources/BUILD @@ -4,14 +4,17 @@ go_library( name = "go_default_library", srcs = [ "event_source.go", - "event_trigger.go", "github.go", ], importpath = "github.com/elafros/eventing/pkg/sources", visibility = ["//visibility:public"], deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/delivery/action:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/google/go-github/github:go_default_library", "//vendor/golang.org/x/oauth2:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/sources/event_source.go b/pkg/sources/event_source.go index c807653c54a..b661a6fd8f4 100644 --- a/pkg/sources/event_source.go +++ b/pkg/sources/event_source.go @@ -16,6 +16,10 @@ limitations under the License. package sources +import "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + +// EventSource manages the subscription of which events a developer is interested in. type EventSource interface { - Bind(trigger EventTrigger, route string) (map[string]interface{}, error) + // Bind creates a new subscription of events. + Bind(bind *v1alpha1.Bind, parameters map[string]interface{}) (map[string]interface{}, error) } diff --git a/pkg/sources/github.go b/pkg/sources/github.go index a6b100a0d1c..6431f217215 100644 --- a/pkg/sources/github.go +++ b/pkg/sources/github.go @@ -18,14 +18,18 @@ package sources import ( "context" + "errors" "fmt" "strings" "github.com/golang/glog" - + ghclient "github.com/google/go-github/github" "golang.org/x/oauth2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" - ghclient "github.com/google/go-github/github" + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/action" ) const ( @@ -37,39 +41,52 @@ const ( MessageResourceSynced = "Bind synced successfully" ) +// GithubEventSource configures Event delivery from GitHub. type GithubEventSource struct { + clientset kubernetes.Interface } -func NewGithubEventSource() EventSource { - return &GithubEventSource{} +// NewGithubEventSource configures a new GithubEventSource. +func NewGithubEventSource(clientset kubernetes.Interface) EventSource { + return &GithubEventSource{clientset: clientset} } -func (t *GithubEventSource) Bind(trigger EventTrigger, route string) (map[string]interface{}, error) { - glog.Infof("CREATING GITHUB WEBHOOK") +// Bind implements EventSource.Bind +func (s *GithubEventSource) Bind(bind *v1alpha1.Bind, parameters map[string]interface{}) (map[string]interface{}, error) { + glog.Infof("BINDING GITHUB EVENT") + // BUG(#43) The GitHub event source should be responsible for its own gateway rathan relying + // on elafros routes to be externally visible. + if bind.Spec.Action.Processor != action.ElafrosActionType { + return nil, errors.New("[elafros/eventing#43] GitHub event source currently only supports Elafros Routes as their action type") + } + address, err := s.getFunctionAddress(bind) + if err != nil { + return nil, err + } ctx := context.Background() ts := oauth2.StaticTokenSource( - &oauth2.Token{AccessToken: trigger.Parameters["accessToken"].(string)}, + &oauth2.Token{AccessToken: parameters["accessToken"].(string)}, ) tc := oauth2.NewClient(ctx, ts) - glog.Infof("CREATING GITHUB WEBHOOK with access token: %s", trigger.Parameters["accessToken"].(string)) + glog.Infof("CREATING GITHUB WEBHOOK with access token: %s", parameters["accessToken"].(string)) client := ghclient.NewClient(tc) active := true name := "web" config := make(map[string]interface{}) - config["url"] = fmt.Sprintf("http://%s", route) + config["url"] = fmt.Sprintf("http://%s", address) config["content_type"] = "json" - config["secret"] = trigger.Parameters["secretToken"].(string) + config["secret"] = parameters["secretToken"].(string) hook := ghclient.Hook{ Name: &name, - URL: &route, + URL: &address, Events: []string{"pull_request"}, Active: &active, Config: config, } - components := strings.Split(trigger.Resource, "/") + components := strings.Split(bind.Spec.Trigger.Resource, "/") h, r, err := client.Repositories.CreateHook(ctx, components[0] /* owner */, components[1] /* repo */, &hook) if err != nil { glog.Warningf("Failed to create the webhook: %s", err) @@ -81,3 +98,32 @@ func (t *GithubEventSource) Bind(trigger EventTrigger, route string) (map[string ret["ID"] = fmt.Sprintf("%d", h.ID) return ret, nil } + +func (s *GithubEventSource) getFunctionAddress(bind *v1alpha1.Bind) (string, error) { + suffix, err := s.getDomainSuffix() + if err != nil { + return "", err + } + + functionDNS := fmt.Sprintf( + "%s.%s.%s", + /* function name */ bind.Spec.Action.Name, + /* function namespace */ bind.ObjectMeta.Namespace, + /* cluster root domain */ suffix) + glog.Infof("Found route DNS as '%q'", functionDNS) + return functionDNS, nil +} + +func (s *GithubEventSource) getDomainSuffix() (string, error) { + const name = "ela-config" + const namespace = "ela-system" + c, err := s.clientset.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + domainSuffix, ok := c.Data["domainSuffix"] + if !ok { + return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) + } + return domainSuffix, nil +} diff --git a/sample/actions/BUILD.bazel b/sample/actions/BUILD.bazel new file mode 100644 index 00000000000..71fc7cbb3c1 --- /dev/null +++ b/sample/actions/BUILD.bazel @@ -0,0 +1,82 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["action.go"], + importpath = "github.com/elafros/eventing/sample/actions", + visibility = ["//visibility:private"], + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) + +go_binary( + name = "action-demo", + embed = [":go_default_library"], + importpath = "github.com/elafros/elafros/sample/actions", + pure = "on", + visibility = ["//visibility:public"], +) + +load("@io_bazel_rules_docker//go:image.bzl", "go_image") + +go_image( + name = "image", + binary = ":action-demo", +) + +load("@k8s_object//:defaults.bzl", "k8s_object") + +# Step 1: Support for action as a debug statement +k8s_object( + name = "bind", + template = ":bind.yaml", +) + +# Step 2: Support for action as an ELA route +# To enable: update the action-sample Bind to have the following +# Spec.Action: +# Name: action-demo +# Processor: elafros.dev/Route + +k8s_object( + name = "configuration", + images = { + "action-demo:latest": ":image", + }, + template = ":configuration.yaml", +) + +k8s_object( + name = "route", + template = ":route.yaml", +) + +# Step 3: Support for action as Service + +k8s_object( + name = "deployment", + images = { + "action-demo:latest": ":image", + }, + template = ":deployment.yaml", +) + +k8s_object( + name = "service", + template = ":service.yaml", +) + +load("@io_bazel_rules_k8s//k8s:objects.bzl", "k8s_objects") + +k8s_objects( + name = "everything", + objects = [ + ":bind", + ":route", + ":configuration", + ":deployment", + ":service", + ], +) diff --git a/sample/actions/README.md b/sample/actions/README.md new file mode 100644 index 00000000000..237a258072f --- /dev/null +++ b/sample/actions/README.md @@ -0,0 +1,136 @@ +# Actions + +Demo to show support for dynamic actions, disjoint from the event source's configuration. + +## Prerequisites + +1. [Setup your development environment](../../DEVELOPMENT.md#getting-started) +2. [Start Elafros](../../README.md#start-elafros) +3. Decide on the DNS name that git can then call. Update elafros/elafros/elaconfig.yaml domainSuffix. +For example I used ela.inlined.me as my hostname, so my elaconfig.yaml looks like so: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: ela-config + namespace: ela-system +data: + domainSuffix: ela.inlined.me +``` + +If you were already running the elafros controllers, you will need to kill the ela-controller in the ela-system namespace for it to pick up the new domain suffix. + +4. Since we will be mimicing an internal service, we need access to the event-delivery service. + Expose it with: + + ``` + kubectl expose deployment event-delivery --port=80 --target-port=9090 --type=LoadBalancer --namespace bind-system + ``` + +5. Install the `sendevent` utility + +```bash +go install github.com/elafros/eventing/cmd/sendevent +``` + +6. Save the address of your exposed `event-delivery` service. + +```bash +export EVENT_DELIVERY_IP=$(kubectl get services/event-delivery --namespace bind-system -o go-template='{{ (index .status.loadBalancer.ingress 0).ip }}') +``` + +## Demo 1: Install Bind with a debug-logging action + +### Deploying assets + +You can deploy this to Elafros from the root directory via: + +```shell +bazel run sample/actions:everything.apply +``` + +This created a handful of resources. The one most immediately useful is the `action-demo` Bind. +An event source would normally deliver events to this Bind by posting to + +``` +${EVENT_DELIVERY_SERVICE}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Sending events + +We'll do the same with the `sendevent` utility: + +```bash +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +(You can optionally configure the sent event by choosing any of `--event-id`, `--event-type`, +`--source`, or `--data`) + +### Observing side-effects + +The event processor deployed in configuration.yaml is "eventing.elafros.dev/EventLogger". This +simply emits a structured event to the logs for the "event-delivery" app. View them with: + +``` +kubectl logs --namespace bind-system -lapp=event-delivery +``` + +## Demo 2: Reroute Bind to an Elafros action. + +### Deploying assets + +In the previous step, you already deployed the elafros route (also named 'action-demo') +that we'll use in this step. To make the demo more exciting, edit "action.go" and change +the `databaseURL` constant to a Firebase Realtime Database URL which you are an owner of. + +To make the `action-demo` Bind point to your new action, edit `bind.yaml` and change the spec.action.processor to `elafros.dev/Route` + +Apply these changes with `blaze run sample/actions:everything.apply` + +### Sending events + +Because we did not change anything about the (mock) event source, the same command will +now have a different effect. + +```bash +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Observing side-effects + +If you load your Firebase Realtime Database, you will see new event-ids appended to the "seenEvents" node every time you run `sendevent` + +## Cleaning up + +To clean up the sample service: + +```shell +bazel run sample/github:everything.delete +``` + +## Demo 3: Reroute Bind to a K8S Service + +### Deploying assets + +This step will reuse the same Docker image as step 2, but will host `action.go` in +a Deployment + Service. To verify that we are targeting a new backend, the Deployment +sets an environment variable to change where in the Firebase Realtime Database events +are published. + +To mkae the `action-demo` bind point to the Service vrsion of our demo, edit `bind.yaml` and change the spec.action.processor to `Service`. + +Apply these changes with `bazel run sample/actions:everything.apply` + +### Sending events + +Again, the same command will now have new effects: + +``` +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Observing side-effects + +If you load your Firebase Realtime Database, you will see new event-ids appended to the "seenEventsInService" node every time you run `sendevent` diff --git a/sample/actions/action.go b/sample/actions/action.go new file mode 100644 index 00000000000..8fb8a7de522 --- /dev/null +++ b/sample/actions/action.go @@ -0,0 +1,82 @@ +/* +Copyright 2018 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + databaseURL = "https://inlined-junkdrawer.firebaseio.com" + address = ":8080" +) + +// SendEventToDB... +func SendEventToDB(w http.ResponseWriter, r *http.Request) { + glog.Info("Sending event to RTDB") + var data map[string]interface{} + context, err := event.FromRequest(&data, r) + if err != nil { + glog.Errorf("Failed to parse event: %s", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + addr := fmt.Sprintf("%s/seenEvents%s/%s/.json", databaseURL, os.Getenv("KEY_SUFFIX"), context.EventID) + url, err := url.Parse(addr) + if err != nil { + glog.Errorf("Failed to parse url %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + // Can not fail + body, _ := json.Marshal(data) + + write := &http.Request{ + Method: http.MethodPut, + URL: url, + Body: ioutil.NopCloser(bytes.NewReader(body)), + } + res, err := http.DefaultClient.Do(write) + if err != nil { + glog.Errorf("Failed to write to RTDB: %s", err) + } + if res.StatusCode/100 != 2 { + glog.Errorf("Got non-success response from RTDB: %d", res.StatusCode) + } + glog.Info("Success!") + + w.WriteHeader(http.StatusOK) +} + +func main() { + http.HandleFunc("/healthz", func(w http.ResponseWriter, req *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.Write([]byte(`{"status": "ok"}`)) + }) + http.HandleFunc("/", SendEventToDB) + http.ListenAndServe(address, nil) +} diff --git a/sample/actions/bind.yaml b/sample/actions/bind.yaml new file mode 100644 index 00000000000..9934c35bf00 --- /dev/null +++ b/sample/actions/bind.yaml @@ -0,0 +1,11 @@ +apiVersion: eventing.elafros.dev/v1alpha1 +kind: Bind +metadata: + name: action-demo +spec: + trigger: + # Note: trigger is not used right now. + # Demos must currently publish directly to the endpoint for this Bind + action: + name: action-demo + processor: eventing.elafros.dev/EventLogger \ No newline at end of file diff --git a/sample/actions/configuration.yaml b/sample/actions/configuration.yaml new file mode 100644 index 00000000000..b34ce3726fd --- /dev/null +++ b/sample/actions/configuration.yaml @@ -0,0 +1,13 @@ +apiVersion: elafros.dev/v1alpha1 +kind: Configuration +metadata: + name: action-demo + namespace: default +spec: + revisionTemplate: + metadata: + labels: + elafros.dev/type: container + spec: + container: + image: action-demo:latest \ No newline at end of file diff --git a/sample/actions/deployment.yaml b/sample/actions/deployment.yaml new file mode 100644 index 00000000000..a00a964c839 --- /dev/null +++ b/sample/actions/deployment.yaml @@ -0,0 +1,33 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: action-demo +spec: + replicas: 1 + template: + metadata: + labels: + app: action-demo-service + spec: + containers: + - name: action-demo + image: action-demo:latest + ports: + - containerPort: 8080 + env: + - name: KEY_SUFFIX + value: "InService" \ No newline at end of file diff --git a/sample/actions/route.yaml b/sample/actions/route.yaml new file mode 100644 index 00000000000..d7bc21ea9e2 --- /dev/null +++ b/sample/actions/route.yaml @@ -0,0 +1,23 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: elafros.dev/v1alpha1 +kind: Route +metadata: + name: action-demo + namespace: default +spec: + traffic: + - configurationName: action-demo + percent: 100 diff --git a/sample/actions/service.yaml b/sample/actions/service.yaml new file mode 100644 index 00000000000..c0e14d0100f --- /dev/null +++ b/sample/actions/service.yaml @@ -0,0 +1,26 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + name: action-demo +spec: + selector: + app: action-demo-service + ports: + - protocol: TCP + port: 80 + targetPort: 8080 + diff --git a/sample/github/pullrequest.yaml b/sample/github/pullrequest.yaml index e5cb476b702..0caa4f230aa 100644 --- a/sample/github/pullrequest.yaml +++ b/sample/github/pullrequest.yaml @@ -30,4 +30,5 @@ spec: name: githubsecret key: githubCredentials action: - routeName: git-webhook + processor: elafros.dev/Route + name: git-webhook From 780320d2debdea672067fad37390deb6ebc2bb60 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Wed, 25 Apr 2018 14:10:42 -0700 Subject: [PATCH 2/2] Bazel auto-fixes --- pkg/sources/BUILD | 1 + sample/actions/BUILD.bazel | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/pkg/sources/BUILD b/pkg/sources/BUILD index bf00542ba60..f8080d27eae 100644 --- a/pkg/sources/BUILD +++ b/pkg/sources/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "event_source.go", + "event_trigger.go", "github.go", ], importpath = "github.com/elafros/eventing/pkg/sources", diff --git a/sample/actions/BUILD.bazel b/sample/actions/BUILD.bazel index 71fc7cbb3c1..5d44e85cc5c 100644 --- a/sample/actions/BUILD.bazel +++ b/sample/actions/BUILD.bazel @@ -80,3 +80,10 @@ k8s_objects( ":service", ], ) + +go_binary( + name = "actions", + embed = [":go_default_library"], + importpath = "github.com/elafros/eventing/sample/actions", + visibility = ["//visibility:public"], +)