Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/delivery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func main() {
processors := map[string]action.Action{
action.LoggingActionType: action.NewLoggingAction(),
action.ElafrosActionType: action.NewElafrosAction(kubeClient, http.DefaultClient),
action.ServiceActionType: action.NewServiceAction(http.DefaultClient),
}
sender := delivery.NewSender(q, processors)

Expand Down
21 changes: 18 additions & 3 deletions pkg/apis/bind/v1alpha1/bind_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ type Bind struct {
Status BindStatus `json:"status"`
}

// BindAction describes where events should be delivered.
type BindAction struct {
// RouteName specifies Elafros route as a target.
RouteName string `json:"routeName,omitempty"`
// Processor dictates the kind of service that will handle the Event.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious if you considered naming this "Handler". When it comes to naming, I tend to look for clues in the description, and in this case you wrote:

...will handle the Event

// For example "elafros.dev/Route"
Processor string `json:"processor"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like Action itself could simply be target where the value is a URL. Both elafrosAction and serviceAction are ultimately HTTP POSTs where the difference is just the way the URL is constructed. As for loggingAction, I would assume logging is an orthogonal concern that would be enabled regardless of the action type, not as an action in its own right.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Processor field gives us some extra flexibility in configuring different types of actions. Even if all the actions expect HTTP POSTs, a same-cluster Elafros action might have different requirements than an external action (e.g. different auth headers).

Also conceivably we might want to allow non-HTTP actions.

So I think there is value in keeping the Processor field.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the internal vs. external header requirements best be handled via Istio/Envoy?

And is there an example of a non-HTTP action that's more than "conceivable" at this point? If not, it seems premature to add a strategy pattern. Also since the different action types in this PR are hardcoded, it's not yet designed in a way to be open for extension.


// Name dicates the resource exposed by Processor that will handle the event.
// The semantics of Name is determined by Processor.
Name string `json:"name"`
}

// EventTrigger describes when an Event should be delivered.
Expand Down Expand Up @@ -119,13 +125,22 @@ type EventTrigger struct {
ParametersFrom []ParametersFromSource `json:"parametersFrom,omitempty"`
}

// Zero determines whether an EventTrigger is fully empty. If a Bind is set up
// without an EventTrigger, it is assumed that the real event source is incompatible
// with our event framework's control plane and will be set up manually through
// side channels.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any example or doc that describes the event framework control plane compatibility, and what this "side channels" concept means? Why would a bind be created if it's not establishing a connection to a trigger?

func (t *EventTrigger) Zero() bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I think I'd prefer something like isZero or isEmpty. Zero to me indicates you're zeroing something. Might be good to say something about why the go default zero value is not enough in the comments also.

return t.EventType == "" && t.Resource == "" && t.Service == "" &&
t.Parameters == nil && t.ParametersFrom == nil
}

// BindSpec is the spec for a Bind resource
type BindSpec struct {
// Action specifies the target handler for the events
Action BindAction `json:"action"`

// Trigger specifies the trigger we're binding to
Trigger EventTrigger `json:trigger"`
Trigger EventTrigger `json:"trigger"`
}

// ParametersFromSource represents the source of a set of Parameters
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/bind/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/client/informers/externalversions:go_default_library",
"//pkg/client/listers/bind/v1alpha1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/delivery/action:go_default_library",
"//pkg/sources:go_default_library",
"//vendor/github.com/elafros/elafros/pkg/client/informers/externalversions:go_default_library",
"//vendor/github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1:go_default_library",
Expand Down
172 changes: 96 additions & 76 deletions pkg/controller/bind/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package bind
import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/ghodss/yaml"
Expand Down Expand Up @@ -48,6 +49,7 @@ import (
bindscheme "github.com/elafros/eventing/pkg/client/clientset/versioned/scheme"
informers "github.com/elafros/eventing/pkg/client/informers/externalversions"
listers "github.com/elafros/eventing/pkg/client/listers/bind/v1alpha1"
"github.com/elafros/eventing/pkg/delivery/action"
)

const controllerAgentName = "bind-controller"
Expand Down Expand Up @@ -133,7 +135,7 @@ func NewController(
}

controller.sources = make(map[string]sources.EventSource)
controller.sources["github.com"] = sources.NewGithubEventSource()
controller.sources["github.com"] = sources.NewGithubEventSource(kubeclientset)
glog.Info("Setting up event handlers")
// Set up an event handler for when Bind resources change
bindInformer.Binds().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -288,76 +290,40 @@ func (c *Controller) syncHandler(key string) error {
bind = bind.DeepCopy()

// Find the Route that they want.
routeName := bind.Spec.Action.RouteName
route, err := c.routesLister.Routes(namespace).Get(routeName)
if err != nil {
actionName := bind.Spec.Action.Name
actionType := bind.Spec.Action.Processor
if err := c.validateAction(namespace, actionName, actionType); err != nil {
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("Route %q in namespace %q does not exist", routeName, namespace))
runtime.HandleError(fmt.Errorf("Action %q of type %q in namespace %q does not exist", actionName, actionType, namespace))
}
glog.Warningf("Bind %q rejected for invalid action: %s", name, err)
return err
}

domainSuffix, err := getDomainSuffixFromElaConfig(c.kubeclientset)
if err != nil {
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("Can't find ela ConfigMap"))
var condition *v1alpha1.BindCondition
if bind.Spec.Trigger.Zero() {
glog.Warningf("Bind %q has no Spec.Trigger", name)
if bind.Status.Conditions != nil {
condition = &v1alpha1.BindCondition{
Type: v1alpha1.BindComplete,
Status: corev1.ConditionTrue,
Reason: fmt.Sprintf("BindSuccess; no Trigger to manage"),
Message: "Bind successful",
}
}
return err
}

functionDNS := fmt.Sprintf("%s.%s.%s", route.Name, route.Namespace, domainSuffix)
glog.Infof("Found route DNS as '%q'", functionDNS)

es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service)
if err != nil {
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace))
} else {
if condition, err = c.bindTrigger(namespace, bind); err != nil {
return err
}
return err
}

et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType)
if err != nil {
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace))
}
return err
}

trigger, err := resolveTrigger(c.kubeclientset, namespace, bind.Spec.Trigger)
if err != nil {
glog.Warningf("Failed to process parameters: %s", err)
return err
}

if bind.Status.Conditions != nil {
if condition == nil {
glog.Infof("Already has status, skipping")
return nil
}

glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger)
if val, ok := c.sources[es.Name]; ok {
r, err := val.Bind(trigger, functionDNS)
if err != nil {
glog.Warningf("BIND failed: %s", err)
msg := fmt.Sprintf("Bind failed with : %s", r)
bind.Status.SetCondition(&v1alpha1.BindCondition{
Type: v1alpha1.BindFailed,
Status: corev1.ConditionTrue,
Reason: "BindFailed",
Message: msg,
})
} else {
bind.Status.SetCondition(&v1alpha1.BindCondition{
Type: v1alpha1.BindComplete,
Status: corev1.ConditionTrue,
Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)),
Message: "Bind successful",
})
}
}
_, err = c.updateStatus(bind)
if err != nil {
bind.Status.SetCondition(condition)
if _, err = c.updateStatus(bind); err != nil {
glog.Warningf("Failed to update status: %s", err)
return err
}
Expand All @@ -381,19 +347,69 @@ func (c *Controller) updateStatus(u *v1alpha1.Bind) (*v1alpha1.Bind, error) {
return bindClient.Update(newu)
}

func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (sources.EventTrigger, error) {
r := sources.EventTrigger{
Resource: trigger.Resource,
EventType: trigger.EventType,
Parameters: make(map[string]interface{}),
func (c *Controller) bindTrigger(namespace string, bind *v1alpha1.Bind) (*v1alpha1.BindCondition, error) {
es, err := c.eventSourcesLister.EventSources(namespace).Get(bind.Spec.Trigger.Service)
if err != nil {
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("EventSource %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace))
}
return nil, err
}

et, err := c.eventTypesLister.EventTypes(namespace).Get(bind.Spec.Trigger.EventType)
if err != nil {
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("EventType %q in namespace %q does not exist", bind.Spec.Trigger.Service, namespace))
}
return nil, err
}

parameters, err := resolveParameters(c.kubeclientset, namespace, bind.Spec.Trigger)
if err != nil {
glog.Warningf("Failed to process parameters: %s", err)
return nil, err
}

if bind.Status.Conditions != nil {
return nil, nil
}

glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, bind.Spec.Trigger)
eventSource, ok := c.sources[es.Name]
if !ok {
return nil, fmt.Errorf("Do not know how to deploy EventTrigger to source with name %s", es.Name)
}

r, err := eventSource.Bind(bind, parameters)
if err != nil {
glog.Warningf("BIND failed: %s", err)
msg := fmt.Sprintf("Bind failed with : %s", r)
return &v1alpha1.BindCondition{
Type: v1alpha1.BindFailed,
Status: corev1.ConditionTrue,
Reason: "BindFailed",
Message: msg,
}, nil
}
return &v1alpha1.BindCondition{
Type: v1alpha1.BindComplete,
Status: corev1.ConditionTrue,
// BUG(43) ID is a feature of the GitHub event source. If it is to become standardized,
// we should not be using a map[string]interface{} result.
Reason: fmt.Sprintf("BindSuccess: Hook: %s", r["ID"].(string)),
Message: "Bind successful",
}, nil
}

func resolveParameters(kubeClient kubernetes.Interface, namespace string, trigger v1alpha1.EventTrigger) (map[string]interface{}, error) {
r := make(map[string]interface{})
if trigger.Parameters != nil && trigger.Parameters.Raw != nil && len(trigger.Parameters.Raw) > 0 {
p := make(map[string]interface{})
if err := yaml.Unmarshal(trigger.Parameters.Raw, &p); err != nil {
return r, err
}
for k, v := range p {
r.Parameters[k] = v
r[k] = v
}
}
if trigger.ParametersFrom != nil {
Expand All @@ -404,7 +420,7 @@ func resolveTrigger(kubeClient kubernetes.Interface, namespace string, trigger v
return r, err
}
for k, v := range pfs {
r.Parameters[k] = v
r[k] = v
}
}
}
Expand Down Expand Up @@ -446,16 +462,20 @@ func unmarshalJSON(in []byte) (map[string]interface{}, error) {
return parameters, nil
}

func getDomainSuffixFromElaConfig(cl kubernetes.Interface) (string, error) {
const name = "ela-config"
const namespace = "ela-system"
c, err := cl.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return "", err
}
domainSuffix, ok := c.Data["domainSuffix"]
if !ok {
return "", fmt.Errorf("cannot find domainSuffix in %v: ConfigMap.Data is %#v", name, c.Data)
func (c *Controller) validateAction(namespace string, name string, actionType string) error {
switch actionType {
case action.ElafrosActionType:
parts := strings.Split(name, "/")
if len(parts) == 2 {
namespace, name = parts[0], parts[1]
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if name does not contain namespace? Seems like we should accept one or the other (namespace, name OR name=namespace/name) but not both here?

_, err := c.routesLister.Routes(namespace).Get(name)
return err
case action.LoggingActionType:
return nil
default:
// Should this be an errors.NewNotFound? How does the action struct
// fit into the schema.GroupResource idea?
return fmt.Errorf("Unknown Action.Processor %q", actionType)
}
return domainSuffix, nil
}
1 change: 1 addition & 0 deletions pkg/delivery/action/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"doc.go",
"elafros.go",
"logging.go",
"service.go",
],
importpath = "github.com/elafros/eventing/pkg/delivery/action",
deps = [
Expand Down
33 changes: 23 additions & 10 deletions pkg/delivery/action/elafros.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,55 @@ const (
ElafrosActionType = "elafros.dev/Route"
)

// ElafrosAction sends Events to an Elafros Route.
type ElafrosAction struct {
// elafrosAction sends Events to an Elafros Route.
type elafrosAction struct {
kubeclientset kubernetes.Interface
httpclient *http.Client
}

// NewElafrosAction creates an Action object that sends Events to Elafros Routes.
func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) *ElafrosAction {
return &ElafrosAction{kubeclientset: kubeclientset, httpclient: httpclient}
func NewElafrosAction(kubeclientset kubernetes.Interface, httpclient *http.Client) Action {
return &elafrosAction{kubeclientset: kubeclientset, httpclient: httpclient}
}

// SendEvent will POST the event spec to the root URI of the elafros route.
func (a *ElafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) {
func (a *elafrosAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) {
glog.Infof("Sending event %s to ELA route %s", context.EventID, name)
var namespace, route string
parts := strings.Split(name, "/")
if len(parts) != 2 {
return nil, fmt.Errorf("Expected elafros route '%s' to be in the form '<namespace>/<route>'", name)
// TODO(inlined): for compatibility with the github demo we implicitly pick up the namespace
// of the Bind resource. Should ensure that we'll always get routes in this form.
namespace, route = "default", name
} else {
namespace, route = parts[0], parts[1]
}
route, namespace := parts[1], parts[0]

domain, err := getDomainSuffixFromElaConfig(a.kubeclientset)
if err != nil {
glog.Error("Could not look up Elafros domain")
return nil, err
}

addr := fmt.Sprintf("http://%s.%s.%s", route, namespace, domain)
addr := fmt.Sprintf("http://%s.%s.%s/", route, namespace, domain)
glog.Info("Sending event", context.EventID, "to ELA route at", addr)
req, err := event.NewRequest(addr, data, *context)
if err != nil {
return nil, err
}

if _, err := a.httpclient.Do(req); err != nil {
res, err := a.httpclient.Do(req)
if err != nil {
glog.Errorf("Failed to send event to webhook %s; err=%s", addr, err)
return nil, err
}
// TODO: Standard handling of non-200 responses as errors.
if res.StatusCode/100 != 2 {
glog.Errorf("Got unsuccessful response code %d from %s", res.StatusCode, addr)
}

// TODO: non-200 responses as errors. Standard decoding of responses to be forwarded.
// TODO: Return response data so that it may be forwarded in chained Binds.
glog.Infof("Sent event to %s; got response %s", addr, res.Status)
return nil, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/delivery/action/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ const (
)

// A LoggingAction will log and drop Events.
type LoggingAction struct{}
type loggingAction struct{}

// SendEvent implements Action.SendEvent
func (a LoggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) {
func (a loggingAction) SendEvent(name string, data interface{}, context *event.Context) (interface{}, error) {
event := map[string]interface{}{
"data": data,
"context": context,
Expand All @@ -47,6 +47,6 @@ func (a LoggingAction) SendEvent(name string, data interface{}, context *event.C
}

// NewLoggingAction creates an Action that will log and drop Events.
func NewLoggingAction() LoggingAction {
return LoggingAction{}
func NewLoggingAction() Action {
return loggingAction{}
}
Loading