From ab138485c09cf4578ddf45152c72fb000bd7f5c4 Mon Sep 17 00:00:00 2001 From: hasbro17 Date: Wed, 14 Feb 2018 13:20:22 -0800 Subject: [PATCH] pkg/sdk: implement sdk.Handle() API --- pkg/sdk/api.go | 11 +++++++---- pkg/sdk/handler/handler.go | 17 +++++++++++++++++ pkg/sdk/informer/informer.go | 15 ++++++++------- pkg/sdk/informer/sync.go | 4 +++- pkg/sdk/types/types.go | 9 +++++++++ 5 files changed, 44 insertions(+), 12 deletions(-) create mode 100644 pkg/sdk/handler/handler.go diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index 47a2382e65..2743f54c77 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -3,8 +3,8 @@ package sdk import ( "context" + sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler" sdkInformer "github.com/coreos/operator-sdk/pkg/sdk/informer" - sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime" @@ -27,13 +27,16 @@ func Watch(resourcePluralName, namespace string, obj runtime.Object, resourceCli informers = append(informers, informer) } -// TODO: func Handle(handler sdkTypes.Handler) +// Handle registers the handler for all events. +// In the future, we would have a mux-pattern to dispatch events to matched handlers. +func Handle(handler sdkHandler.Handler) { + sdkHandler.RegisteredHandler = handler +} // Run starts the process of Watching resources, handling Events, and processing Actions func Run(ctx context.Context) { - sdkCtx := sdkTypes.Context{Context: ctx} for _, informer := range informers { - err := informer.Run(sdkCtx) + err := informer.Run(ctx) if err != nil { logrus.Errorf("failed to run informer: %v", err) return diff --git a/pkg/sdk/handler/handler.go b/pkg/sdk/handler/handler.go new file mode 100644 index 0000000000..6eef6b03f1 --- /dev/null +++ b/pkg/sdk/handler/handler.go @@ -0,0 +1,17 @@ +package handler + +import ( + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" +) + +// Handler reacts to events and outputs actions. +// If any intended action failed, the event would be re-triggered. +// For actions done before the failed action, there is no rollback. +type Handler interface { + Handle(sdkTypes.Context, sdkTypes.Event) []sdkTypes.Action +} + +var ( + // RegisteredHandler is the user registered handler set by sdk.Handle() + RegisteredHandler Handler +) diff --git a/pkg/sdk/informer/informer.go b/pkg/sdk/informer/informer.go index cd359aa341..5a0917e417 100644 --- a/pkg/sdk/informer/informer.go +++ b/pkg/sdk/informer/informer.go @@ -1,11 +1,10 @@ package informer import ( + "context" "errors" "time" - sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" - "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" @@ -17,7 +16,7 @@ import ( ) type Informer interface { - Run(ctx sdkTypes.Context) error + Run(ctx context.Context) error } type informer struct { @@ -26,6 +25,7 @@ type informer struct { queue workqueue.RateLimitingInterface kubeClient kubernetes.Interface namespace string + context context.Context } func New(resourcePluralName, namespace string, objType runtime.Object, resourceClient rest.Interface) Informer { @@ -49,19 +49,20 @@ func New(resourcePluralName, namespace string, objType runtime.Object, resourceC return i } -func (i *informer) Run(ctx sdkTypes.Context) error { +func (i *informer) Run(ctx context.Context) error { + i.context = ctx defer i.queue.ShutDown() logrus.Info("starting %s controller", i.resourcePluralName) - go i.sharedIndexInformer.Run(ctx.Context.Done()) + go i.sharedIndexInformer.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Context.Done(), i.sharedIndexInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), i.sharedIndexInformer.HasSynced) { return errors.New("Timed out waiting for caches to sync") } const numWorkers = 1 for n := 0; n < numWorkers; n++ { - go wait.Until(i.runWorker, time.Second, ctx.Context.Done()) + go wait.Until(i.runWorker, time.Second, ctx.Done()) } return nil } diff --git a/pkg/sdk/informer/sync.go b/pkg/sdk/informer/sync.go index 92d9f12a14..70919049e1 100644 --- a/pkg/sdk/informer/sync.go +++ b/pkg/sdk/informer/sync.go @@ -1,6 +1,7 @@ package informer import ( + sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler" sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" "github.com/sirupsen/logrus" @@ -53,7 +54,8 @@ func (i *informer) sync(key string) error { Deleted: !exists, } - // TODO: call registered handler for the event + sdkCtx := sdkTypes.Context{Context: i.context} + actions := sdkHandler.RegisteredHandler.Handle(sdkCtx, event) // TODO: Add option to prevent multiple informers from invoking Handle() concurrently? // TODO: process all actions for this event diff --git a/pkg/sdk/types/types.go b/pkg/sdk/types/types.go index d5c88a1d43..f188638635 100644 --- a/pkg/sdk/types/types.go +++ b/pkg/sdk/types/types.go @@ -24,3 +24,12 @@ type Event struct { type Context struct { Context context.Context } + +// FuncType defines the type of the function of an Action. +type FuncType string + +// Action defines what Func(tion) to apply on the given Object. +type Action struct { + Object Object + Func FuncType +}