Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,13 @@ const (
)

var (
masterURL string
kubeconfig string
receiveAdapterImage = flag.String("receiveadapter", "", "receive adapter docker image <temporary hack>")
masterURL string
kubeconfig string
)

func main() {
flag.Parse()

glog.Infof("RECEIVE IMAGE: %q", *receiveAdapterImage)

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

Expand Down Expand Up @@ -94,7 +91,7 @@ func main() {
controllers := make([]controller.Interface, 0, len(ctors))
for _, ctor := range ctors {
controllers = append(controllers,
ctor(kubeClient, client, kubeInformerFactory, informerFactory, elaInformerFactory, *receiveAdapterImage))
ctor(kubeClient, client, kubeInformerFactory, informerFactory, elaInformerFactory))
}

go kubeInformerFactory.Start(stopCh)
Expand Down
1 change: 0 additions & 1 deletion config/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ spec:
args: [
"-logtostderr",
"-stderrthreshold", "INFO",
"-receiveadapter", "github.com/elafros/eventing/sample/gcp_pubsub",
]
9 changes: 9 additions & 0 deletions pkg/apis/bind/v1alpha1/event_source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)

// +genclient
Expand All @@ -40,6 +41,14 @@ type EventSource struct {
type EventSourceSpec struct {
Type string `json:"type,omitempty"`
Source string `json:"source,omitempty"`

// Image that we run for bind/unbind operations
Image string `json:"image,omitempty"`

// Parameters are configuration options for a particular EventSource
// TODO: Consider instead using ConfigMaps and mount them instead
// on the event sources containers.
Parameters *runtime.RawExtension `json:"parameters,omitempty"`
}

// EventSourceStatus is the status for a EventSource resource
Expand Down
153 changes: 75 additions & 78 deletions pkg/controller/bind/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface

// bindclientset is a clientset for our own API group
bindclientset clientset.Interface

Expand All @@ -93,8 +94,6 @@ type Controller struct {
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder

sources map[string]sources.EventSource
}

// NewController returns a new bind controller
Expand All @@ -103,8 +102,7 @@ func NewController(
bindclientset clientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
bindInformerFactory informers.SharedInformerFactory,
routeInformerFactory elainformers.SharedInformerFactory,
receiveAdapterImage string) controller.Interface {
routeInformerFactory elainformers.SharedInformerFactory) controller.Interface {

// obtain a reference to a shared index informer for the Bind types.
bindInformer := bindInformerFactory.Eventing().V1alpha1()
Expand Down Expand Up @@ -137,9 +135,6 @@ func NewController(
recorder: recorder,
}

controller.sources = make(map[string]sources.EventSource)
controller.sources["github"] = sources.NewGithubEventSource()
controller.sources["gcppubsub"] = sources.NewGCPPubSubEventSource(kubeclientset, receiveAdapterImage)
glog.Info("Setting up event handlers")
// Set up an event handler for when Bind resources change
bindInformer.Binds().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -343,95 +338,97 @@ func (c *Controller) syncHandler(key string) error {
return err
}

// If there are conditions or a context do nothing.
if (bind.Status.Conditions != nil || bind.Status.BindContext != nil) && deletionTimestamp == nil {
glog.Infof("Already has status, skipping")
return nil
}

trigger, err := resolveTrigger(c.kubeclientset, namespace, bind.Spec.Trigger)
if err != nil {
glog.Warningf("Failed to process parameters: %s", err)
return err
}

// If there are conditions or a context do nothing.
if (bind.Status.Conditions != nil || bind.Status.BindContext != nil) && deletionTimestamp == nil {
glog.Infof("Already has status, skipping")
return nil
}
// Don't mutate the informer's copy of our object.
newES := es.DeepCopy()

if val, ok := c.sources[es.Name]; ok {
if deletionTimestamp == nil {
glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger)
bindContext, err := val.Bind(trigger, functionDNS)
binder := sources.NewContainerEventSource(bind, c.kubeclientset, &newES.Spec, "bind-system")
if deletionTimestamp == nil {
glog.Infof("Creating a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger)
bindContext, err := binder.Bind(trigger, functionDNS)

if err != nil {
glog.Warningf("BIND failed: %s", err)
msg := fmt.Sprintf("Bind failed with : %s", err)
bind.Status.SetCondition(&v1alpha1.BindCondition{
Type: v1alpha1.BindFailed,
Status: corev1.ConditionTrue,
Reason: "BindFailed",
Message: msg,
})
} else {
glog.Infof("Got context back as: %+v", bindContext)
marshalledBindContext, err := json.Marshal(&bindContext.Context)
if err != nil {
glog.Warningf("BIND failed: %s", err)
msg := fmt.Sprintf("Bind failed with : %s", err)
bind.Status.SetCondition(&v1alpha1.BindCondition{
Type: v1alpha1.BindFailed,
Status: corev1.ConditionTrue,
Reason: "BindFailed",
Message: msg,
})
glog.Warningf("Couldn't marshal bind context: %+v : %s", bindContext, err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this return err? Should we retry? Seems like it should be Error log level at least. Why was the BindFailed update removed?

} else {
glog.Infof("Got context back as: %+v", bindContext)
marshalledBindContext, err := json.Marshal(&bindContext.Context)
if err != nil {
glog.Warningf("Couldn't marshal bind context: %+v : %s", bindContext, err)
} else {
glog.Infof("Marshaled context to: %+v", marshalledBindContext)
bind.Status.BindContext = &runtimetypes.RawExtension{
Raw: make([]byte, len(marshalledBindContext)),
}
bind.Status.BindContext.Raw = marshalledBindContext
}

// Set the finalizer since the bind succeeded, we need to clean up...
// TODO: we should do this in the webhook instead...
bind.Finalizers = append(bind.ObjectMeta.Finalizers, controllerAgentName)
_, err = c.updateFinalizers(bind)
if err != nil {
glog.Warningf("Failed to update finalizers: %s", err)
return err
glog.Infof("Marshaled context to: %+v", marshalledBindContext)
bind.Status.BindContext = &runtimetypes.RawExtension{
Raw: make([]byte, len(marshalledBindContext)),
}

bind.Status.SetCondition(&v1alpha1.BindCondition{
Type: v1alpha1.BindComplete,
Status: corev1.ConditionTrue,
Reason: "BindSuccess",
Message: "Bind successful",
})
bind.Status.BindContext.Raw = marshalledBindContext
}
_, err = c.updateStatus(bind)

// Set the finalizer since the bind succeeded, we need to clean up...
// TODO: we should do this in the webhook instead...
bind.Finalizers = append(bind.ObjectMeta.Finalizers, controllerAgentName)
_, err = c.updateFinalizers(bind)
if err != nil {
glog.Warningf("Failed to update status: %s", err)
glog.Warningf("Failed to update finalizers: %s", err)
return err
}
} else {
glog.Infof("Deleting a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger)
bindContext := sources.BindContext{
Context: make(map[string]interface{}),
}
if bind.Status.BindContext != nil && bind.Status.BindContext.Raw != nil && len(bind.Status.BindContext.Raw) > 0 {
if err := json.Unmarshal(bind.Status.BindContext.Raw, &bindContext.Context); err != nil {
glog.Warningf("Couldn't unmarshal BindContext: %v", err)
// TODO set the condition properly here
return err
}
}
err := val.Unbind(trigger, bindContext)
if err != nil {
glog.Warningf("Couldn't unbind: %v", err)

bind.Status.SetCondition(&v1alpha1.BindCondition{
Type: v1alpha1.BindComplete,
Status: corev1.ConditionTrue,
Reason: "BindSuccess",
Message: "Bind successful",
})
}
_, err = c.updateStatus(bind)
if err != nil {
glog.Warningf("Failed to update status: %s", err)
return err
}
} else {
glog.Infof("Deleting a subscription to %q : %q with Trigger %+v", es.Name, et.Name, trigger)
bindContext := sources.BindContext{
Context: make(map[string]interface{}),
}
if bind.Status.BindContext != nil && bind.Status.BindContext.Raw != nil && len(bind.Status.BindContext.Raw) > 0 {
if err := json.Unmarshal(bind.Status.BindContext.Raw, &bindContext.Context); err != nil {
glog.Warningf("Couldn't unmarshal BindContext: %v", err)
// TODO set the condition properly here
return err
}
newFinalizers, err := RemoveFinalizer(bind, controllerAgentName)
if err != nil {
glog.Warningf("Failed to remove finalizer: %s", err)
return err
}
bind.ObjectMeta.Finalizers = newFinalizers
_, err = c.updateFinalizers(bind)
if err != nil {
glog.Warningf("Failed to update finalizers: %s", err)
return err
}
}
err := binder.Unbind(trigger, bindContext)
if err != nil {
glog.Warningf("Couldn't unbind: %v", err)
// TODO set the condition properly here
return err
}
newFinalizers, err := RemoveFinalizer(bind, controllerAgentName)
if err != nil {
glog.Warningf("Failed to remove finalizer: %s", err)
return err
}
bind.ObjectMeta.Finalizers = newFinalizers
_, err = c.updateFinalizers(bind)
if err != nil {
glog.Warningf("Failed to update finalizers: %s", err)
return err
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ type Interface interface {
Run(threadiness int, stopCh <-chan struct{}) error
}

type Constructor func(kubernetes.Interface, clientset.Interface, kubeinformers.SharedInformerFactory, informers.SharedInformerFactory, elainformers.SharedInformerFactory, string) Interface
type Constructor func(kubernetes.Interface, clientset.Interface, kubeinformers.SharedInformerFactory, informers.SharedInformerFactory, elainformers.SharedInformerFactory) Interface
Loading