diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go new file mode 100644 index 0000000000..47a2382e65 --- /dev/null +++ b/pkg/sdk/api.go @@ -0,0 +1,43 @@ +package sdk + +import ( + "context" + + sdkInformer "github.com/coreos/operator-sdk/pkg/sdk/informer" + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" +) + +var ( + // informers is the set of all informers for the resources watched by the user + informers []sdkInformer.Informer +) + +// Watch watches for changes on the given resource. +// obj is an instance of the resource type, e.g. &Pod{}. +// resourcePluralName is the plural name of the resource, e.g. “pods”. +// resourceClient is the rest client for the resource, e.g. `kubeclient.CoreV1().RESTClient()`. +// opts provide more options for doing the watch. +// TODO: support opts for specifying label selector +func Watch(resourcePluralName, namespace string, obj runtime.Object, resourceClient rest.Interface) { + informer := sdkInformer.New(resourcePluralName, namespace, obj, resourceClient) + informers = append(informers, informer) +} + +// TODO: func Handle(handler sdkTypes.Handler) + +// Run starts the process of Watching resources, handling Events, and processing Actions +func Run(ctx context.Context) { + sdkCtx := sdkTypes.Context{Context: ctx} + for _, informer := range informers { + err := informer.Run(sdkCtx) + if err != nil { + logrus.Errorf("failed to run informer: %v", err) + return + } + } + <-ctx.Done() +} diff --git a/pkg/sdk/informer/informer.go b/pkg/sdk/informer/informer.go new file mode 100644 index 0000000000..cd359aa341 --- /dev/null +++ b/pkg/sdk/informer/informer.go @@ -0,0 +1,93 @@ +package informer + +import ( + "errors" + "time" + + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type Informer interface { + Run(ctx sdkTypes.Context) error +} + +type informer struct { + resourcePluralName string + sharedIndexInformer cache.SharedIndexInformer + queue workqueue.RateLimitingInterface + kubeClient kubernetes.Interface + namespace string +} + +func New(resourcePluralName, namespace string, objType runtime.Object, resourceClient rest.Interface) Informer { + i := &informer{ + resourcePluralName: resourcePluralName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourcePluralName), + // TODO: set the kube client + kubeClient: nil, + namespace: namespace, + } + + i.sharedIndexInformer = cache.NewSharedIndexInformer( + cache.NewListWatchFromClient(resourceClient, resourcePluralName, namespace, fields.Everything()), + objType, 0, cache.Indexers{}, + ) + i.sharedIndexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: i.handleAddResourceEvent, + DeleteFunc: i.handleDeleteResourceEvent, + UpdateFunc: i.handleUpdateResourceEvent, + }) + return i +} + +func (i *informer) Run(ctx sdkTypes.Context) error { + defer i.queue.ShutDown() + + logrus.Info("starting %s controller", i.resourcePluralName) + go i.sharedIndexInformer.Run(ctx.Context.Done()) + + if !cache.WaitForCacheSync(ctx.Context.Done(), i.sharedIndexInformer.HasSynced) { + return errors.New("Timed out waiting for caches to sync") + } + + const numWorkers = 1 + for n := 0; n < numWorkers; n++ { + go wait.Until(i.runWorker, time.Second, ctx.Context.Done()) + } + return nil +} + +func (i *informer) handleAddResourceEvent(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + panic(err) + } + i.queue.Add(key) +} + +func (i *informer) handleDeleteResourceEvent(obj interface{}) { + // For deletes we have to use this key function + // to handle the DeletedFinalStateUnknown case for the object + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + panic(err) + } + i.queue.Add(key) +} + +func (i *informer) handleUpdateResourceEvent(oldObj, newObj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err != nil { + panic(err) + } + i.queue.Add(key) +} diff --git a/pkg/sdk/informer/sync.go b/pkg/sdk/informer/sync.go new file mode 100644 index 0000000000..92d9f12a14 --- /dev/null +++ b/pkg/sdk/informer/sync.go @@ -0,0 +1,87 @@ +package informer + +import ( + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + + "github.com/sirupsen/logrus" +) + +const ( + // Copy from deployment_controller.go: + // maxRetries is the number of times a Vault will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // a Vault is going to be requeued: + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 +) + +func (i *informer) runWorker() { + for i.processNextItem() { + } +} + +func (i *informer) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := i.queue.Get() + if quit { + return false + } + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer i.queue.Done(key) + + // Invoke the method containing the business logic + err := i.sync(key.(string)) + + // Handle the error if something went wrong during the execution of the business logic + i.handleErr(err, key) + return true +} + +// sync creates the event for the object, sends it to the handler, and processes the resulting actions +func (i *informer) sync(key string) error { + obj, exists, err := i.sharedIndexInformer.GetIndexer().GetByKey(key) + if err != nil { + return err + } + object := obj.(sdkTypes.Object) + + event := sdkTypes.Event{ + Object: object, + Deleted: !exists, + } + + // TODO: call registered handler for the event + // TODO: Add option to prevent multiple informers from invoking Handle() concurrently? + + // TODO: process all actions for this event + + return nil +} + +// handleErr checks if an error happened and makes sure we will retry later. +func (i *informer) handleErr(err error, key interface{}) { + if err == nil { + // Forget about the #AddRateLimited history of the key on every successful synchronization. + // This ensures that future processing of updates for this key is not delayed because of + // an outdated error history. + i.queue.Forget(key) + return + } + + // This controller retries maxRetries times if something goes wrong. After that, it stops trying. + if i.queue.NumRequeues(key) < maxRetries { + logrus.Errorf("error syncing key (%v): %v", key, err) + + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + i.queue.AddRateLimited(key) + return + } + + i.queue.Forget(key) + // Report that, even after several retries, we could not successfully process this key + logrus.Infof("Dropping key (%v) out of the queue: %v", key, err) +} diff --git a/pkg/sdk/types/types.go b/pkg/sdk/types/types.go new file mode 100644 index 0000000000..d5c88a1d43 --- /dev/null +++ b/pkg/sdk/types/types.go @@ -0,0 +1,26 @@ +package types + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" +) + +// Object is the Kubernetes runtime.Object interface expected +// of all resources that the user can watch. +type Object runtime.Object + +// Event is triggered when some change has happened on the watched resources. +// If created or updated, Object would be the current state and Deleted=false. +// If deleted, Object would be the last known state and Deleted=true. +type Event struct { + Object Object + Deleted bool +} + +// Context is the special context that is passed to the Handler. +// It includes: +// - Context: standard Go context that is used to pass cancellation signals and deadlines +type Context struct { + Context context.Context +}