diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go new file mode 100644 index 0000000000..bb75a153eb --- /dev/null +++ b/pkg/sdk/api.go @@ -0,0 +1,71 @@ +package sdk + +import ( + "context" + "fmt" + + "github.com/coreos/operator-sdk/pkg/sdk/dispatcher" + sdkInformer "github.com/coreos/operator-sdk/pkg/sdk/informer" + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +type SDK struct { + // informers is the set of all informers registered by the user + informers map[string]sdkInformer.Informer + handler sdkTypes.Handler +} + +// NewKubeInformer returns an Informer for the specified resourceType +// - resourceName is the plural name of the resource kind, e.g: configmaps, secrets +// - namespace is the namespace to watch for that resource +// - objType is the runtime object to infer the type, e.g: &v1.ConfigMap{} +// - resourceClient is the REST client used to list and watch the resource +func NewKubeInformer(resourceName, namespace string, objType runtime.Object, resourceClient cache.Getter) sdkInformer.Informer { + return sdkInformer.New(resourceName, namespace, objType, resourceClient) +} + +// RegisterInformer registers an SDK informer under the specified name +func (sdk *SDK) RegisterInformer(informerName string, informer sdkInformer.Informer) error { + if _, ok := sdk.informers[informerName]; ok { + return fmt.Errorf("informer (%v) is already registered", informerName) + } + sdk.informers[informerName] = informer + return nil +} + +func (sdk *SDK) RegisterHandler(handler sdkTypes.Handler) { + sdk.handler = handler +} + +func (sdk *SDK) Run(ctx context.Context) { + // Run all informers and get the event channels + var eventChans []<-chan *sdkTypes.Event + for _, informer := range sdk.informers { + evc, err := informer.Run(ctx) + if err != nil { + panic("TODO") + } + eventChans = append(eventChans, evc) + } + + // Create a new dispatcher to pass events to the registered handler + dp := dispatcher.New(eventChans, sdk.handler) + dp.Run(ctx) +} + +/* +```main.go +func main() { + sdk.RegisterInformer("play-informer", informer.NewKubeInformer(&Play{})) + sdk.RegisterInformer("pod-informer", informer.NewKubeInformer(&Pod{})) + + sdk.RegisterActor("kube-apply", actor.KubeResourceApply) + sdk.RegisterActor("kube-delete", actor.KubeResourceDelete) + + sdk.RegisterHandle(stub.NewHandler()) + sdk.Run(ctx) +} +*/ diff --git a/pkg/sdk/dispatcher/dispatcher.go b/pkg/sdk/dispatcher/dispatcher.go new file mode 100644 index 0000000000..3069daf4de --- /dev/null +++ b/pkg/sdk/dispatcher/dispatcher.go @@ -0,0 +1,24 @@ +package dispatcher + +import ( + "context" + + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" +) + +type Dispatcher interface { + Run(ctx context.Context) error +} + +type dispatcher struct { + eventChans []<-chan *sdkTypes.Event +} + +func New(eventChans []<-chan *sdkTypes.Event, handler sdkTypes.Handler) *dispatcher { + panic("TODO") +} + +// Run runs the dispatcher which collects events from all the event channels and sends it to the handler +func (d *dispatcher) Run(ctx context.Context) error { + panic("TODO") +} diff --git a/pkg/sdk/informer/informer.go b/pkg/sdk/informer/informer.go new file mode 100644 index 0000000000..fcb59b8acb --- /dev/null +++ b/pkg/sdk/informer/informer.go @@ -0,0 +1,95 @@ +package informer + +import ( + "context" + "errors" + "time" + + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type Informer interface { + Run(ctx context.Context) (<-chan *sdkTypes.Event, error) +} + +type informer struct { + resourceName string + sharedIndexInformer cache.SharedIndexInformer + queue workqueue.RateLimitingInterface + kubeClient kubernetes.Interface + namespace string + eventChan chan *sdkTypes.Event +} + +func New(resourceName, namespace string, objType runtime.Object, resourceClient cache.Getter) Informer { + i := &informer{ + resourceName: resourceName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourceName), + kubeClient: nil, + namespace: namespace, + eventChan: make(chan *sdkTypes.Event, 100), + } + + i.sharedIndexInformer = cache.NewSharedIndexInformer( + cache.NewListWatchFromClient(resourceClient, resourceName, namespace, fields.Everything()), + objType, 0, cache.Indexers{}, + ) + i.sharedIndexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: i.handleAddResourceEvent, + DeleteFunc: i.handleDeleteResourceEvent, + UpdateFunc: i.handleUpdateResourceEvent, + }) + return i +} + +func (i *informer) Run(ctx context.Context) (<-chan *sdkTypes.Event, error) { + defer i.queue.ShutDown() + + logrus.Info("starting %s controller", i.resourceName) + go i.sharedIndexInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), i.sharedIndexInformer.HasSynced) { + return nil, errors.New("Timed out waiting for caches to sync") + + } + + const numWorkers = 1 + for n := 0; n < numWorkers; n++ { + go wait.Until(i.runWorker, time.Second, ctx.Done()) + } + + return i.eventChan, nil +} + +func (i *informer) handleAddResourceEvent(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + panic(err) + } + i.queue.Add(key) +} + +func (i *informer) handleDeleteResourceEvent(obj interface{}) { + // For deletes we have to use this key function + // to handle the DeletedFinalStateUnknown case for the object + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + panic(err) + } + i.queue.Add(key) +} + +func (i *informer) handleUpdateResourceEvent(oldObj, newObj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err != nil { + panic(err) + } + i.queue.Add(key) +} diff --git a/pkg/sdk/informer/sync.go b/pkg/sdk/informer/sync.go new file mode 100644 index 0000000000..c2e62e74d9 --- /dev/null +++ b/pkg/sdk/informer/sync.go @@ -0,0 +1,81 @@ +package informer + +import ( + "github.com/sirupsen/logrus" + + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" +) + +const ( + // Copy from deployment_controller.go: + // maxRetries is the number of times a Vault will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // a Vault is going to be requeued: + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 +) + +func (i *informer) runWorker() { + for i.processNextItem() { + } +} + +func (i *informer) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := i.queue.Get() + if quit { + return false + } + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer i.queue.Done(key) + + // Invoke the method containing the business logic + err := i.sync(key.(string)) + + // Handle the error if something went wrong during the execution of the business logic + i.handleErr(err, key) + return true +} + +// sync wraps the object into an event and sends it on the event channel for the controller +func (i *informer) sync(key string) error { + obj, exists, err := i.sharedIndexInformer.GetIndexer().GetByKey(key) + if err != nil { + return err + } + + ev := &sdkTypes.Event{ + Object: obj, + ObjectExist: exists, + } + i.eventChan <- ev + return nil +} + +// handleErr checks if an error happened and makes sure we will retry later. +func (i *informer) handleErr(err error, key interface{}) { + if err == nil { + // Forget about the #AddRateLimited history of the key on every successful synchronization. + // This ensures that future processing of updates for this key is not delayed because of + // an outdated error history. + i.queue.Forget(key) + return + } + + // This controller retries maxRetries times if something goes wrong. After that, it stops trying. + if i.queue.NumRequeues(key) < maxRetries { + logrus.Errorf("error syncing key (%v): %v", key, err) + + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + i.queue.AddRateLimited(key) + return + } + + i.queue.Forget(key) + // Report that, even after several retries, we could not successfully process this key + logrus.Infof("Dropping key (%v) out of the queue: %v", key, err) +} diff --git a/pkg/sdk/types/types.go b/pkg/sdk/types/types.go new file mode 100644 index 0000000000..808fde49dd --- /dev/null +++ b/pkg/sdk/types/types.go @@ -0,0 +1,19 @@ +package types + +import "context" + +type Object interface{} + +type Event struct { + Object Object + ObjectExist bool +} + +type Action struct { + Object Object + Actor string +} + +type Handler interface { + Handle(ctx context.Context, events []Event) []Action +} diff --git a/pkg/stub/handle.go b/pkg/stub/handle.go new file mode 100644 index 0000000000..c8f7c730a2 --- /dev/null +++ b/pkg/stub/handle.go @@ -0,0 +1,16 @@ +package stub + +import ( + "context" + + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" +) + +type handler struct { + // custom data structure +} + +func (h *handler) Handle(ctx context.Context, events []sdkTypes.Event) []sdkTypes.Action { + // Filled out by user + return nil +}