diff --git a/cmd/delivery/main.go b/cmd/delivery/main.go index 877cf8f7d91..810103e5f48 100644 --- a/cmd/delivery/main.go +++ b/cmd/delivery/main.go @@ -87,6 +87,7 @@ func main() { processors := map[string]action.Action{ action.LoggingActionType: action.NewLoggingAction(), action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient), + action.ServiceActionType: action.NewServiceAction(http.DefaultClient), } sender := delivery.NewSender(q, processors) diff --git a/pkg/apis/bind/v1alpha1/bind_types.go b/pkg/apis/bind/v1alpha1/bind_types.go index 6267e02d954..704837e89ab 100644 --- a/pkg/apis/bind/v1alpha1/bind_types.go +++ b/pkg/apis/bind/v1alpha1/bind_types.go @@ -35,9 +35,15 @@ type Bind struct { Status BindStatus `json:"status"` } +// BindAction describes where events should be delivered. type BindAction struct { - // RouteName specifies Elafros route as a target. - RouteName string `json:"routeName,omitempty"` + // Processor dictates the kind of service that will handle the Event. + // For example "elafros.dev/Route" + Processor string `json:"processor"` + + // Name dicates the resource exposed by Processor that will handle the event. + // The semantics of Name is determined by Processor. + Name string `json:"name"` } // EventTrigger describes when an Event should be delivered. @@ -119,13 +125,22 @@ type EventTrigger struct { ParametersFrom []ParametersFromSource `json:"parametersFrom,omitempty"` } +// Zero determines whether an EventTrigger is fully empty. If a Bind is set up +// without an EventTrigger, it is assumed that the real event source is incompatible +// with our event framework's control plane and will be set up manually through +// side channels. +func (t *EventTrigger) Zero() bool { + return t.EventType == "" && t.Resource == "" && t.Service == "" && + t.Parameters == nil && t.ParametersFrom == nil +} + // BindSpec is the spec for a Bind resource type BindSpec struct { // Action specifies the target handler for the events Action BindAction `json:"action"` // Trigger specifies the trigger we're binding to - Trigger EventTrigger `json:trigger"` + Trigger EventTrigger `json:"trigger"` } // ParametersFromSource represents the source of a set of Parameters diff --git a/pkg/controller/bind/BUILD b/pkg/controller/bind/BUILD index a128b5dba39..b2b51a1d341 100644 --- a/pkg/controller/bind/BUILD +++ b/pkg/controller/bind/BUILD @@ -12,6 +12,7 @@ go_library( "//pkg/client/informers/externalversions:go_default_library", "//pkg/client/listers/bind/v1alpha1:go_default_library", "//pkg/controller:go_default_library", + "//pkg/delivery/action:go_default_library", "//pkg/sources:go_default_library", "//vendor/github.com/elafros/elafros/pkg/client/informers/externalversions:go_default_library", "//vendor/github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1:go_default_library", diff --git a/pkg/controller/bind/controller.go b/pkg/controller/bind/controller.go index 50aa1ce223e..d4c72dfd7de 100644 --- a/pkg/controller/bind/controller.go +++ b/pkg/controller/bind/controller.go @@ -19,6 +19,7 @@ package bind import ( "encoding/json" "fmt" + "strings" "time" "github.com/ghodss/yaml" @@ -48,6 +49,7 @@ import ( bindscheme "github.com/elafros/eventing/pkg/client/clientset/versioned/scheme" informers "github.com/elafros/eventing/pkg/client/informers/externalversions" listers "github.com/elafros/eventing/pkg/client/listers/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/action" ) const controllerAgentName = "bind-controller" @@ -133,7 +135,7 @@ func NewController( } controller.sources = make(map[string]sources.EventSource) - controller.sources["github.com"] = sources.NewGithubEventSource() + controller.sources["github.com"] = sources.NewGithubEventSource(kubeclientset) glog.Info("Setting up event handlers") // Set up an event handler for when Bind resources change bindInformer.Binds().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -288,76 +290,40 @@ func (c *Controller) syncHandler(key string) error { bind = bind.DeepCopy() // Find the Route that they want. - routeName := bind.Spec.Action.RouteName - route, err := c.routesLister.Routes(namespace).Get(routeName) - if err != nil { + actionName := bind.Spec.Action.Name + actionType := bind.Spec.Action.Processor + if err := c.validateAction(namespace, actionName, actionType); err != nil { if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Route %q in namespace %q does not exist", routeName, namespace)) + runtime.HandleError(fmt.Errorf("Action %q of type %q in namespace %q does not exist", actionName, actionType, namespace)) } + glog.Warningf("Bind %q rejected for invalid action: %s", name, err) return err } - domainSuffix, err := getDomainSuffixFromElaConfig(c.kubeclientset) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("Can't find ela ConfigMap")) + var condition *v1alpha1.BindCondition + if bind.Spec.Trigger.Zero() { + glog.Warningf("Bind %q has no Spec.Trigger", name) + if bind.Status.Conditions != nil { + condition = &v1alpha1.BindCondition{ + Type: v1alpha1.BindComplete, + Status: corev1.ConditionTrue, + Reason: fmt.Sprintf("BindSuccess; no Trigger to manage"), + Message: "Bind successful", + } } - return err - } - - functionDNS := fmt.Sprintf("%s.%s.%s", route.Name, route.Namespace, domainSuffix) - glog.Infof("Found route DNS as '%q'", functionDNS) - - es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } else { + if condition, err = c.bindTrigger(namespace, bind); err != nil { + return err } - return err } - et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType) - if err != nil { - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) - } - return err - } - - trigger, err := resolveTrigger(c.kubeclientset, namespace, bind.Spec.Trigger) - if err != nil { - glog.Warningf("Failed to process parameters: %s", err) - return err - } - - if bind.Status.Conditions != nil { + if condition == nil { glog.Infof("Already has status, skipping") return nil } - glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger) - if val, ok := c.sources[es.Name]; ok { - r, err := val.Bind(trigger, functionDNS) - if err != nil { - glog.Warningf("BIND failed: %s", err) - msg := fmt.Sprintf("Bind failed with : %s", r) - bind.Status.SetCondition(&v1alpha1.BindCondition{ - Type: v1alpha1.BindFailed, - Status: corev1.ConditionTrue, - Reason: "BindFailed", - Message: msg, - }) - } else { - bind.Status.SetCondition(&v1alpha1.BindCondition{ - Type: v1alpha1.BindComplete, - Status: corev1.ConditionTrue, - Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)), - Message: "Bind successful", - }) - } - } - _, err = c.updateStatus(bind) - if err != nil { + bind.Status.SetCondition(condition) + if _, err = c.updateStatus(bind); err != nil { glog.Warningf("Failed to update status: %s", err) return err } @@ -381,19 +347,69 @@ func (c *Controller) updateStatus(u *v1alpha1.Bind) (*v1alpha1.Bind, error) { return bindClient.Update(newu) } -func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (sources.EventTrigger, error) { - r := sources.EventTrigger{ - Resource: trigger.Resource, - EventType: trigger.EventType, - Parameters: make(map[string]interface{}), +func (c *Controller) bindTrigger(namespace string, bind *v1alpha1.Bind) (*v1alpha1.BindCondition, error) { + es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service) + if err != nil { + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } + return nil, err } + + et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType) + if err != nil { + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace)) + } + return nil, err + } + + parameters, err := resolveParameters(c.kubeclientset, namespace, bind.Spec.Trigger) + if err != nil { + glog.Warningf("Failed to process parameters: %s", err) + return nil, err + } + + if bind.Status.Conditions != nil { + return nil, nil + } + + glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, bind.Spec.Trigger) + eventSource, ok := c.sources[es.Name] + if !ok { + return nil, fmt.Errorf("Do not know how to deploy EventTrigger to source with name %s", es.Name) + } + + r, err := eventSource.Bind(bind, parameters) + if err != nil { + glog.Warningf("BIND failed: %s", err) + msg := fmt.Sprintf("Bind failed with : %s", r) + return &v1alpha1.BindCondition{ + Type: v1alpha1.BindFailed, + Status: corev1.ConditionTrue, + Reason: "BindFailed", + Message: msg, + }, nil + } + return &v1alpha1.BindCondition{ + Type: v1alpha1.BindComplete, + Status: corev1.ConditionTrue, + // BUG(43) ID is a feature of the GitHub event source. If it is to become standardized, + // we should not be using a map[string]interface{} result. + Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)), + Message: "Bind successful", + }, nil +} + +func resolveParameters(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (map[string]interface{}, error) { + r := make(map[string]interface{}) if trigger.Parameters != nil && trigger.Parameters.Raw != nil && len(trigger.Parameters.Raw) > 0 { p := make(map[string]interface{}) if err := yaml.Unmarshal(trigger.Parameters.Raw, &p); err != nil { return r, err } for k, v := range p { - r.Parameters[k] = v + r[k] = v } } if trigger.ParametersFrom != nil { @@ -404,7 +420,7 @@ func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v return r, err } for k, v := range pfs { - r.Parameters[k] = v + r[k] = v } } } @@ -446,16 +462,20 @@ func unmarshalJSON(in []byte) (map[string]interface{}, error) { return parameters, nil } -func getDomainSuffixFromElaConfig(cl kubernetes.Interface) (string, error) { - const name = "ela-config" - const namespace = "ela-system" - c, err := cl.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return "", err - } - domainSuffix, ok := c.Data["domainSuffix"] - if !ok { - return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) +func (c *Controller) validateAction(namespace string, name string, actionType string) error { + switch actionType { + case action.ElafrosActionType: + parts := strings.Split(name, "/") + if len(parts) == 2 { + namespace, name = parts[0], parts[1] + } + _, err := c.routesLister.Routes(namespace).Get(name) + return err + case action.LoggingActionType: + return nil + default: + // Should this be an errors.NewNotFound? How does the action struct + // fit into the schema.GroupResource idea? + return fmt.Errorf("Unknown Action.Processor %q", actionType) } - return domainSuffix, nil } diff --git a/pkg/delivery/action/BUILD.bazel b/pkg/delivery/action/BUILD.bazel index 0cde53d03de..5daf4ccf451 100644 --- a/pkg/delivery/action/BUILD.bazel +++ b/pkg/delivery/action/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "doc.go", "elafros.go", "logging.go", + "service.go", ], importpath = "github.com/elafros/eventing/pkg/delivery/action", deps = [ diff --git a/pkg/delivery/action/elafros.go b/pkg/delivery/action/elafros.go index 5224759f375..73fb799752b 100644 --- a/pkg/delivery/action/elafros.go +++ b/pkg/delivery/action/elafros.go @@ -18,24 +18,29 @@ const ( ElafrosActionType = "elafros.dev/Route" ) -// ElafrosAction sends Events to an Elafros Route. -type ElafrosAction struct { +// elafrosAction sends Events to an Elafros Route. +type elafrosAction struct { kubeclientset kubernetes.Interface httpclient *http.Client } // NewElafrosAction creates an Action object that sends Events to Elafros Routes. -func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) *ElafrosAction { - return &ElafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} +func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) Action { + return &elafrosAction{kubeclientset: kubeclientset, httpclient: httpclient} } // SendEvent will POST the event spec to the root URI of the elafros route. -func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { +func (a *elafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + glog.Infof("Sending event %s to ELA route %s", context.EventID, name) + var namespace, route string parts := strings.Split(name, "/") if len(parts) != 2 { - return nil, fmt.Errorf("Expected elafros route '%s' to be in the form '/'", name) + // TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace + // of the Bind resource. Should ensure that we'll always get routes in this form. + namespace, route = "default", name + } else { + namespace, route = parts[0], parts[1] } - route, namespace := parts[1], parts[0] domain, err := getDomainSuffixFromElaConfig(a.kubeclientset) if err != nil { @@ -43,17 +48,25 @@ func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event. return nil, err } - addr := fmt.Sprintf("http://%s.%s.%s", route, namespace, domain) + addr := fmt.Sprintf("http://%s.%s.%s/", route, namespace, domain) + glog.Info("Sending event", context.EventID, "to ELA route at", addr) req, err := event.NewRequest(addr, data, *context) if err != nil { return nil, err } - if _, err := a.httpclient.Do(req); err != nil { + res, err := a.httpclient.Do(req) + if err != nil { + glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err) return nil, err } + // TODO: Standard handling of non-200 responses as errors. + if res.StatusCode/100 != 2 { + glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr) + } - // TODO: non-200 responses as errors. Standard decoding of responses to be forwarded. + // TODO: Return response data so that it may be forwarded in chained Binds. + glog.Infof("Sent event to %s; got response %s", addr, res.Status) return nil, nil } diff --git a/pkg/delivery/action/logging.go b/pkg/delivery/action/logging.go index 05f98c7c959..298c53ae631 100644 --- a/pkg/delivery/action/logging.go +++ b/pkg/delivery/action/logging.go @@ -30,10 +30,10 @@ const ( ) // A LoggingAction will log and drop Events. -type LoggingAction struct{} +type loggingAction struct{} // SendEvent implements Action.SendEvent -func (a LoggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { +func (a loggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { event := map[string]interface{}{ "data": data, "context": context, @@ -47,6 +47,6 @@ func (a LoggingAction) SendEvent(name string, data interface{}, context *event.C } // NewLoggingAction creates an Action that will log and drop Events. -func NewLoggingAction() LoggingAction { - return LoggingAction{} +func NewLoggingAction() Action { + return loggingAction{} } diff --git a/pkg/delivery/action/service.go b/pkg/delivery/action/service.go new file mode 100644 index 00000000000..6278d0e1ba2 --- /dev/null +++ b/pkg/delivery/action/service.go @@ -0,0 +1,62 @@ +package action + +import ( + "fmt" + "net/http" + "strings" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + // ServiceActionType is the expected Bind Processor name to + // cause Events to be sent to a K8S service. + ServiceActionType = "Service" +) + +// serviceAction sends Events to a K8S service. +type serviceAction struct { + httpclient *http.Client +} + +// NewServiceAction creates an Action object that sends Events to K8S services. +func NewServiceAction(httpclient *http.Client) Action { + return &serviceAction{httpclient: httpclient} +} + +// SendEvent will POST the event spec to the root URI of the elafros route. +func (a *serviceAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) { + glog.Infof("Sending event %s to K8S service %s", context.EventID, name) + var namespace, service string + parts := strings.Split(name, "/") + if len(parts) != 2 { + // TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace + // of the Bind resource. Should ensure that we'll always get routes in this form. + namespace, service = "default", name + } else { + namespace, service = parts[0], parts[1] + } + + addr := fmt.Sprintf("http://%s.%s.svc.cluster.local", service, namespace) + glog.Info("Sending event", context.EventID, "to K8S service at", addr) + req, err := event.NewRequest(addr, data, *context) + if err != nil { + glog.Errorf("Failed to marshal event: %s", err) + return nil, err + } + + res, err := a.httpclient.Do(req) + if err != nil { + glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err) + return nil, err + } + // TODO: Standard handling of non-200 responses as errors. + if res.StatusCode/100 != 2 { + glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr) + } + + // TODO: Return response data so that it may be forwarded in chained Binds. + glog.Infof("Sent event to %s; got response %s", addr, res.Status) + return nil, nil +} diff --git a/pkg/delivery/queue/BUILD.bazel b/pkg/delivery/queue/BUILD.bazel index df8567bec89..a47d3cf440b 100644 --- a/pkg/delivery/queue/BUILD.bazel +++ b/pkg/delivery/queue/BUILD.bazel @@ -11,6 +11,7 @@ go_library( ], importpath = "github.com/elafros/eventing/pkg/delivery/queue", deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", "//pkg/event:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], diff --git a/pkg/delivery/queue/queue.go b/pkg/delivery/queue/queue.go index 286bd5b7826..e91e3bb72c8 100644 --- a/pkg/delivery/queue/queue.go +++ b/pkg/delivery/queue/queue.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" "github.com/elafros/eventing/pkg/event" ) @@ -25,17 +26,9 @@ import ( // It may be reasonable to just return a tuple of these values from // the Queue pull operations instead of defining the struct. type QueuedEvent struct { - Action ActionType `json:"action"` - Data interface{} `json:"data"` - Context *event.Context `json:"context"` -} - -// ActionType is a shim while Bind's Action is not generic. -// TODO(vaikas): Remove this once Bind's Action has been migrated -// to be generic. -type ActionType struct { - Name string `json:"name"` - Processor string `json:"processor"` + Action v1alpha1.BindAction `json:"action"` + Data interface{} `json:"data"` + Context *event.Context `json:"context"` } // Queue implements basic features to allow asynchronous buffering of events. diff --git a/pkg/delivery/receiver.go b/pkg/delivery/receiver.go index 860189ea0d3..07defca33e6 100644 --- a/pkg/delivery/receiver.go +++ b/pkg/delivery/receiver.go @@ -34,17 +34,6 @@ import ( // will do late-time filtering of Events. var directSendEventRegExp = regexp.MustCompile(`^.*/namespaces/([^/]*)/flows/([^/]*):sendEvent$`) -// TODO(vaikas): Remove this once Bind's Action has been migrated -// to be generic. -const hardCodedProcessor = "eventing.elafros.dev/EventLogger" - -func actionFromBind(bind *v1alpha1.Bind) queue.ActionType { - return queue.ActionType{ - Name: bind.Spec.Action.RouteName, - Processor: hardCodedProcessor, - } -} - // Receiver manages the HTTP endpoints for receiving events as well as // stats about the queue for processing events. type Receiver struct { @@ -58,7 +47,7 @@ func NewReceiver(bindsLister listers.BindLister, eventQueue queue.Queue) *Receiv } // SendEvent enqueues an event data and Context for delivery to a particular action. -func (r *Receiver) SendEvent(action queue.ActionType, data interface{}, context *event.Context) error { +func (r *Receiver) SendEvent(action v1alpha1.BindAction, data interface{}, context *event.Context) error { return r.eventQueue.Push(queue.QueuedEvent{ Action: action, Data: data, @@ -68,6 +57,7 @@ func (r *Receiver) SendEvent(action queue.ActionType, data interface{}, context // ServeHTTP implements the external REST API that other services will use to send events. func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { + glog.Infof("Serving %s", req.URL.Path) if req.Method != http.MethodPost { glog.V(3).Info("Cannot handle method", req.Method) w.WriteHeader(http.StatusMethodNotAllowed) @@ -102,9 +92,13 @@ func (r *Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - if err := r.SendEvent(actionFromBind(bind), data, context); err != nil { + if err := r.SendEvent(bind.Spec.Action, data, context); err != nil { glog.Error("Failed to enqueue event", err) w.WriteHeader(http.StatusInternalServerError) return } + + glog.Infof("Enqueued event %s for delivery to %s action %s", + context.EventID, bind.Spec.Action.Processor, bind.Spec.Action.Name) + w.WriteHeader(http.StatusOK) } diff --git a/pkg/delivery/receiver_test.go b/pkg/delivery/receiver_test.go index c8280ca2f7b..c22977a20ec 100644 --- a/pkg/delivery/receiver_test.go +++ b/pkg/delivery/receiver_test.go @@ -51,7 +51,8 @@ var ( Resource: "{abc}/123", }, Action: v1alpha1.BindAction{ - RouteName: "vaikas.fix.this", + Processor: "eventing.elafros.dev/WebHook", + Name: "https://demo.google.com/notreal", }, }, } @@ -163,4 +164,7 @@ func TestEnqueueEvent(t *testing.T) { if !reflect.DeepEqual(data, event.Data) { t.Fatalf("Event data was not marshalled correctly; expected=%+v got=%+v", data, event.Data) } + if !reflect.DeepEqual(demoBind.Spec.Action, event.Action) { + t.Fatalf("Event action was not marshalled correctly; expected=%+v got=%+v", demoBind.Spec.Action, event.Action) + } } diff --git a/pkg/delivery/sender.go b/pkg/delivery/sender.go index 1487767b22c..14dd1e66c4a 100644 --- a/pkg/delivery/sender.go +++ b/pkg/delivery/sender.go @@ -62,7 +62,12 @@ func (s *Sender) RunOnce(stopCh <-chan struct{}) error { glog.V(4).Info("Sending Event %s to %s Action %s", event.Context.EventID, event.Action.Processor, event.Action.Name) action, ok := s.actions[event.Action.Processor] if !ok { - return fmt.Errorf("Event %s is routed to unknown Processor '%s'", event.Context.EventID, event.Action.Processor) + keys := make([]string, 0, len(s.actions)) + for key := range s.actions { + keys = append(keys, key) + } + return fmt.Errorf("Event %s is routed to unknown Processor '%s'; known processors are %q", + event.Context.EventID, event.Action.Processor, keys) } // TODO(inlined): retry strategies for errors and continuations for success. @@ -81,6 +86,7 @@ func (s *Sender) run(stopCh <-chan struct{}) { if err == io.EOF { return } + glog.Warning(err) runtime.HandleError(err) } } diff --git a/pkg/delivery/sender_test.go b/pkg/delivery/sender_test.go index 73b9505bcff..91980386318 100644 --- a/pkg/delivery/sender_test.go +++ b/pkg/delivery/sender_test.go @@ -21,6 +21,7 @@ import ( "sync" "testing" + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" "github.com/elafros/eventing/pkg/delivery" "github.com/elafros/eventing/pkg/delivery/action" "github.com/elafros/eventing/pkg/delivery/queue" @@ -63,7 +64,7 @@ func TestSendEvent(t *testing.T) { q.Push(queue.QueuedEvent{ Context: expectedContext, Data: expectedData, - Action: queue.ActionType{ + Action: v1alpha1.BindAction{ Processor: actionType, Name: actionName, }, diff --git a/pkg/sources/BUILD b/pkg/sources/BUILD index 02922d4c903..f8080d27eae 100644 --- a/pkg/sources/BUILD +++ b/pkg/sources/BUILD @@ -10,8 +10,12 @@ go_library( importpath = "github.com/elafros/eventing/pkg/sources", visibility = ["//visibility:public"], deps = [ + "//pkg/apis/bind/v1alpha1:go_default_library", + "//pkg/delivery/action:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/google/go-github/github:go_default_library", "//vendor/golang.org/x/oauth2:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/sources/event_source.go b/pkg/sources/event_source.go index c807653c54a..b661a6fd8f4 100644 --- a/pkg/sources/event_source.go +++ b/pkg/sources/event_source.go @@ -16,6 +16,10 @@ limitations under the License. package sources +import "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + +// EventSource manages the subscription of which events a developer is interested in. type EventSource interface { - Bind(trigger EventTrigger, route string) (map[string]interface{}, error) + // Bind creates a new subscription of events. + Bind(bind *v1alpha1.Bind, parameters map[string]interface{}) (map[string]interface{}, error) } diff --git a/pkg/sources/github.go b/pkg/sources/github.go index a6b100a0d1c..6431f217215 100644 --- a/pkg/sources/github.go +++ b/pkg/sources/github.go @@ -18,14 +18,18 @@ package sources import ( "context" + "errors" "fmt" "strings" "github.com/golang/glog" - + ghclient "github.com/google/go-github/github" "golang.org/x/oauth2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" - ghclient "github.com/google/go-github/github" + "github.com/elafros/eventing/pkg/apis/bind/v1alpha1" + "github.com/elafros/eventing/pkg/delivery/action" ) const ( @@ -37,39 +41,52 @@ const ( MessageResourceSynced = "Bind synced successfully" ) +// GithubEventSource configures Event delivery from GitHub. type GithubEventSource struct { + clientset kubernetes.Interface } -func NewGithubEventSource() EventSource { - return &GithubEventSource{} +// NewGithubEventSource configures a new GithubEventSource. +func NewGithubEventSource(clientset kubernetes.Interface) EventSource { + return &GithubEventSource{clientset: clientset} } -func (t *GithubEventSource) Bind(trigger EventTrigger, route string) (map[string]interface{}, error) { - glog.Infof("CREATING GITHUB WEBHOOK") +// Bind implements EventSource.Bind +func (s *GithubEventSource) Bind(bind *v1alpha1.Bind, parameters map[string]interface{}) (map[string]interface{}, error) { + glog.Infof("BINDING GITHUB EVENT") + // BUG(#43) The GitHub event source should be responsible for its own gateway rathan relying + // on elafros routes to be externally visible. + if bind.Spec.Action.Processor != action.ElafrosActionType { + return nil, errors.New("[elafros/eventing#43] GitHub event source currently only supports Elafros Routes as their action type") + } + address, err := s.getFunctionAddress(bind) + if err != nil { + return nil, err + } ctx := context.Background() ts := oauth2.StaticTokenSource( - &oauth2.Token{AccessToken: trigger.Parameters["accessToken"].(string)}, + &oauth2.Token{AccessToken: parameters["accessToken"].(string)}, ) tc := oauth2.NewClient(ctx, ts) - glog.Infof("CREATING GITHUB WEBHOOK with access token: %s", trigger.Parameters["accessToken"].(string)) + glog.Infof("CREATING GITHUB WEBHOOK with access token: %s", parameters["accessToken"].(string)) client := ghclient.NewClient(tc) active := true name := "web" config := make(map[string]interface{}) - config["url"] = fmt.Sprintf("http://%s", route) + config["url"] = fmt.Sprintf("http://%s", address) config["content_type"] = "json" - config["secret"] = trigger.Parameters["secretToken"].(string) + config["secret"] = parameters["secretToken"].(string) hook := ghclient.Hook{ Name: &name, - URL: &route, + URL: &address, Events: []string{"pull_request"}, Active: &active, Config: config, } - components := strings.Split(trigger.Resource, "/") + components := strings.Split(bind.Spec.Trigger.Resource, "/") h, r, err := client.Repositories.CreateHook(ctx, components[0] /* owner */, components[1] /* repo */, &hook) if err != nil { glog.Warningf("Failed to create the webhook: %s", err) @@ -81,3 +98,32 @@ func (t *GithubEventSource) Bind(trigger EventTrigger, route string) (map[string ret["ID"] = fmt.Sprintf("%d", h.ID) return ret, nil } + +func (s *GithubEventSource) getFunctionAddress(bind *v1alpha1.Bind) (string, error) { + suffix, err := s.getDomainSuffix() + if err != nil { + return "", err + } + + functionDNS := fmt.Sprintf( + "%s.%s.%s", + /* function name */ bind.Spec.Action.Name, + /* function namespace */ bind.ObjectMeta.Namespace, + /* cluster root domain */ suffix) + glog.Infof("Found route DNS as '%q'", functionDNS) + return functionDNS, nil +} + +func (s *GithubEventSource) getDomainSuffix() (string, error) { + const name = "ela-config" + const namespace = "ela-system" + c, err := s.clientset.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + domainSuffix, ok := c.Data["domainSuffix"] + if !ok { + return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data) + } + return domainSuffix, nil +} diff --git a/sample/actions/BUILD.bazel b/sample/actions/BUILD.bazel new file mode 100644 index 00000000000..5d44e85cc5c --- /dev/null +++ b/sample/actions/BUILD.bazel @@ -0,0 +1,89 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["action.go"], + importpath = "github.com/elafros/eventing/sample/actions", + visibility = ["//visibility:private"], + deps = [ + "//pkg/event:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) + +go_binary( + name = "action-demo", + embed = [":go_default_library"], + importpath = "github.com/elafros/elafros/sample/actions", + pure = "on", + visibility = ["//visibility:public"], +) + +load("@io_bazel_rules_docker//go:image.bzl", "go_image") + +go_image( + name = "image", + binary = ":action-demo", +) + +load("@k8s_object//:defaults.bzl", "k8s_object") + +# Step 1: Support for action as a debug statement +k8s_object( + name = "bind", + template = ":bind.yaml", +) + +# Step 2: Support for action as an ELA route +# To enable: update the action-sample Bind to have the following +# Spec.Action: +# Name: action-demo +# Processor: elafros.dev/Route + +k8s_object( + name = "configuration", + images = { + "action-demo:latest": ":image", + }, + template = ":configuration.yaml", +) + +k8s_object( + name = "route", + template = ":route.yaml", +) + +# Step 3: Support for action as Service + +k8s_object( + name = "deployment", + images = { + "action-demo:latest": ":image", + }, + template = ":deployment.yaml", +) + +k8s_object( + name = "service", + template = ":service.yaml", +) + +load("@io_bazel_rules_k8s//k8s:objects.bzl", "k8s_objects") + +k8s_objects( + name = "everything", + objects = [ + ":bind", + ":route", + ":configuration", + ":deployment", + ":service", + ], +) + +go_binary( + name = "actions", + embed = [":go_default_library"], + importpath = "github.com/elafros/eventing/sample/actions", + visibility = ["//visibility:public"], +) diff --git a/sample/actions/README.md b/sample/actions/README.md new file mode 100644 index 00000000000..237a258072f --- /dev/null +++ b/sample/actions/README.md @@ -0,0 +1,136 @@ +# Actions + +Demo to show support for dynamic actions, disjoint from the event source's configuration. + +## Prerequisites + +1. [Setup your development environment](../../DEVELOPMENT.md#getting-started) +2. [Start Elafros](../../README.md#start-elafros) +3. Decide on the DNS name that git can then call. Update elafros/elafros/elaconfig.yaml domainSuffix. +For example I used ela.inlined.me as my hostname, so my elaconfig.yaml looks like so: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: ela-config + namespace: ela-system +data: + domainSuffix: ela.inlined.me +``` + +If you were already running the elafros controllers, you will need to kill the ela-controller in the ela-system namespace for it to pick up the new domain suffix. + +4. Since we will be mimicing an internal service, we need access to the event-delivery service. + Expose it with: + + ``` + kubectl expose deployment event-delivery --port=80 --target-port=9090 --type=LoadBalancer --namespace bind-system + ``` + +5. Install the `sendevent` utility + +```bash +go install github.com/elafros/eventing/cmd/sendevent +``` + +6. Save the address of your exposed `event-delivery` service. + +```bash +export EVENT_DELIVERY_IP=$(kubectl get services/event-delivery --namespace bind-system -o go-template='{{ (index .status.loadBalancer.ingress 0).ip }}') +``` + +## Demo 1: Install Bind with a debug-logging action + +### Deploying assets + +You can deploy this to Elafros from the root directory via: + +```shell +bazel run sample/actions:everything.apply +``` + +This created a handful of resources. The one most immediately useful is the `action-demo` Bind. +An event source would normally deliver events to this Bind by posting to + +``` +${EVENT_DELIVERY_SERVICE}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Sending events + +We'll do the same with the `sendevent` utility: + +```bash +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +(You can optionally configure the sent event by choosing any of `--event-id`, `--event-type`, +`--source`, or `--data`) + +### Observing side-effects + +The event processor deployed in configuration.yaml is "eventing.elafros.dev/EventLogger". This +simply emits a structured event to the logs for the "event-delivery" app. View them with: + +``` +kubectl logs --namespace bind-system -lapp=event-delivery +``` + +## Demo 2: Reroute Bind to an Elafros action. + +### Deploying assets + +In the previous step, you already deployed the elafros route (also named 'action-demo') +that we'll use in this step. To make the demo more exciting, edit "action.go" and change +the `databaseURL` constant to a Firebase Realtime Database URL which you are an owner of. + +To make the `action-demo` Bind point to your new action, edit `bind.yaml` and change the spec.action.processor to `elafros.dev/Route` + +Apply these changes with `blaze run sample/actions:everything.apply` + +### Sending events + +Because we did not change anything about the (mock) event source, the same command will +now have a different effect. + +```bash +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Observing side-effects + +If you load your Firebase Realtime Database, you will see new event-ids appended to the "seenEvents" node every time you run `sendevent` + +## Cleaning up + +To clean up the sample service: + +```shell +bazel run sample/github:everything.delete +``` + +## Demo 3: Reroute Bind to a K8S Service + +### Deploying assets + +This step will reuse the same Docker image as step 2, but will host `action.go` in +a Deployment + Service. To verify that we are targeting a new backend, the Deployment +sets an environment variable to change where in the Firebase Realtime Database events +are published. + +To mkae the `action-demo` bind point to the Service vrsion of our demo, edit `bind.yaml` and change the spec.action.processor to `Service`. + +Apply these changes with `bazel run sample/actions:everything.apply` + +### Sending events + +Again, the same command will now have new effects: + +``` +sendevent http://${EVENT_DELIVERY_IP}/v1alpha1/namespaces/default/flows/action-demo:sendEvent +``` + +### Observing side-effects + +If you load your Firebase Realtime Database, you will see new event-ids appended to the "seenEventsInService" node every time you run `sendevent` diff --git a/sample/actions/action.go b/sample/actions/action.go new file mode 100644 index 00000000000..8fb8a7de522 --- /dev/null +++ b/sample/actions/action.go @@ -0,0 +1,82 @@ +/* +Copyright 2018 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + + "github.com/elafros/eventing/pkg/event" + "github.com/golang/glog" +) + +const ( + databaseURL = "https://inlined-junkdrawer.firebaseio.com" + address = ":8080" +) + +// SendEventToDB... +func SendEventToDB(w http.ResponseWriter, r *http.Request) { + glog.Info("Sending event to RTDB") + var data map[string]interface{} + context, err := event.FromRequest(&data, r) + if err != nil { + glog.Errorf("Failed to parse event: %s", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + addr := fmt.Sprintf("%s/seenEvents%s/%s/.json", databaseURL, os.Getenv("KEY_SUFFIX"), context.EventID) + url, err := url.Parse(addr) + if err != nil { + glog.Errorf("Failed to parse url %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + // Can not fail + body, _ := json.Marshal(data) + + write := &http.Request{ + Method: http.MethodPut, + URL: url, + Body: ioutil.NopCloser(bytes.NewReader(body)), + } + res, err := http.DefaultClient.Do(write) + if err != nil { + glog.Errorf("Failed to write to RTDB: %s", err) + } + if res.StatusCode/100 != 2 { + glog.Errorf("Got non-success response from RTDB: %d", res.StatusCode) + } + glog.Info("Success!") + + w.WriteHeader(http.StatusOK) +} + +func main() { + http.HandleFunc("/healthz", func(w http.ResponseWriter, req *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.Write([]byte(`{"status": "ok"}`)) + }) + http.HandleFunc("/", SendEventToDB) + http.ListenAndServe(address, nil) +} diff --git a/sample/actions/bind.yaml b/sample/actions/bind.yaml new file mode 100644 index 00000000000..9934c35bf00 --- /dev/null +++ b/sample/actions/bind.yaml @@ -0,0 +1,11 @@ +apiVersion: eventing.elafros.dev/v1alpha1 +kind: Bind +metadata: + name: action-demo +spec: + trigger: + # Note: trigger is not used right now. + # Demos must currently publish directly to the endpoint for this Bind + action: + name: action-demo + processor: eventing.elafros.dev/EventLogger \ No newline at end of file diff --git a/sample/actions/configuration.yaml b/sample/actions/configuration.yaml new file mode 100644 index 00000000000..b34ce3726fd --- /dev/null +++ b/sample/actions/configuration.yaml @@ -0,0 +1,13 @@ +apiVersion: elafros.dev/v1alpha1 +kind: Configuration +metadata: + name: action-demo + namespace: default +spec: + revisionTemplate: + metadata: + labels: + elafros.dev/type: container + spec: + container: + image: action-demo:latest \ No newline at end of file diff --git a/sample/actions/deployment.yaml b/sample/actions/deployment.yaml new file mode 100644 index 00000000000..a00a964c839 --- /dev/null +++ b/sample/actions/deployment.yaml @@ -0,0 +1,33 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: action-demo +spec: + replicas: 1 + template: + metadata: + labels: + app: action-demo-service + spec: + containers: + - name: action-demo + image: action-demo:latest + ports: + - containerPort: 8080 + env: + - name: KEY_SUFFIX + value: "InService" \ No newline at end of file diff --git a/sample/actions/route.yaml b/sample/actions/route.yaml new file mode 100644 index 00000000000..d7bc21ea9e2 --- /dev/null +++ b/sample/actions/route.yaml @@ -0,0 +1,23 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: elafros.dev/v1alpha1 +kind: Route +metadata: + name: action-demo + namespace: default +spec: + traffic: + - configurationName: action-demo + percent: 100 diff --git a/sample/actions/service.yaml b/sample/actions/service.yaml new file mode 100644 index 00000000000..c0e14d0100f --- /dev/null +++ b/sample/actions/service.yaml @@ -0,0 +1,26 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + name: action-demo +spec: + selector: + app: action-demo-service + ports: + - protocol: TCP + port: 80 + targetPort: 8080 + diff --git a/sample/github/pullrequest.yaml b/sample/github/pullrequest.yaml index e5cb476b702..0caa4f230aa 100644 --- a/sample/github/pullrequest.yaml +++ b/sample/github/pullrequest.yaml @@ -30,4 +30,5 @@ spec: name: githubsecret key: githubCredentials action: - routeName: git-webhook + processor: elafros.dev/Route + name: git-webhook