Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions pkg/sdk/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package sdk

import (
"context"

sdkInformer "github.com/coreos/operator-sdk/pkg/sdk/informer"
sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
)

var (
// informers is the set of all informers for the resources watched by the user
informers []sdkInformer.Informer
)

// Watch watches for changes on the given resource.
// obj is an instance of the resource type, e.g. &Pod{}.
// resourcePluralName is the plural name of the resource, e.g. “pods”.
// resourceClient is the rest client for the resource, e.g. `kubeclient.CoreV1().RESTClient()`.
// opts provide more options for doing the watch.
// TODO: support opts for specifying label selector
func Watch(resourcePluralName, namespace string, obj runtime.Object, resourceClient rest.Interface) {
informer := sdkInformer.New(resourcePluralName, namespace, obj, resourceClient)
informers = append(informers, informer)
}

// TODO: func Handle(handler sdkTypes.Handler)

// Run starts the process of Watching resources, handling Events, and processing Actions
func Run(ctx context.Context) {
sdkCtx := sdkTypes.Context{Context: ctx}
for _, informer := range informers {
err := informer.Run(sdkCtx)
if err != nil {
logrus.Errorf("failed to run informer: %v", err)
return
}
}
<-ctx.Done()
}
93 changes: 93 additions & 0 deletions pkg/sdk/informer/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package informer

import (
"errors"
"time"

sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

type Informer interface {
Run(ctx sdkTypes.Context) error
}

type informer struct {
resourcePluralName string
sharedIndexInformer cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
kubeClient kubernetes.Interface
namespace string
}

func New(resourcePluralName, namespace string, objType runtime.Object, resourceClient rest.Interface) Informer {
i := &informer{
resourcePluralName: resourcePluralName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourcePluralName),
// TODO: set the kube client
kubeClient: nil,
namespace: namespace,
}

i.sharedIndexInformer = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(resourceClient, resourcePluralName, namespace, fields.Everything()),
objType, 0, cache.Indexers{},
)
i.sharedIndexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: i.handleAddResourceEvent,
DeleteFunc: i.handleDeleteResourceEvent,
UpdateFunc: i.handleUpdateResourceEvent,
})
return i
}

func (i *informer) Run(ctx sdkTypes.Context) error {
defer i.queue.ShutDown()

logrus.Info("starting %s controller", i.resourcePluralName)
go i.sharedIndexInformer.Run(ctx.Context.Done())

if !cache.WaitForCacheSync(ctx.Context.Done(), i.sharedIndexInformer.HasSynced) {
return errors.New("Timed out waiting for caches to sync")
}

const numWorkers = 1
for n := 0; n < numWorkers; n++ {
go wait.Until(i.runWorker, time.Second, ctx.Context.Done())
}
return nil
}

func (i *informer) handleAddResourceEvent(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
panic(err)
}
i.queue.Add(key)
}

func (i *informer) handleDeleteResourceEvent(obj interface{}) {
// For deletes we have to use this key function
// to handle the DeletedFinalStateUnknown case for the object
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
panic(err)
}
i.queue.Add(key)
}

func (i *informer) handleUpdateResourceEvent(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
panic(err)
}
i.queue.Add(key)
}
87 changes: 87 additions & 0 deletions pkg/sdk/informer/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package informer

import (
sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"

"github.com/sirupsen/logrus"
)

const (
// Copy from deployment_controller.go:
// maxRetries is the number of times a Vault will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
// a Vault is going to be requeued:
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)

func (i *informer) runWorker() {
for i.processNextItem() {
}
}

func (i *informer) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := i.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer i.queue.Done(key)

// Invoke the method containing the business logic
err := i.sync(key.(string))

// Handle the error if something went wrong during the execution of the business logic
i.handleErr(err, key)
return true
}

// sync creates the event for the object, sends it to the handler, and processes the resulting actions
func (i *informer) sync(key string) error {
obj, exists, err := i.sharedIndexInformer.GetIndexer().GetByKey(key)
if err != nil {
return err
}
object := obj.(sdkTypes.Object)

event := sdkTypes.Event{
Object: object,
Deleted: !exists,
}

// TODO: call registered handler for the event
// TODO: Add option to prevent multiple informers from invoking Handle() concurrently?

// TODO: process all actions for this event

return nil
}

// handleErr checks if an error happened and makes sure we will retry later.
func (i *informer) handleErr(err error, key interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
i.queue.Forget(key)
return
}

// This controller retries maxRetries times if something goes wrong. After that, it stops trying.
if i.queue.NumRequeues(key) < maxRetries {
logrus.Errorf("error syncing key (%v): %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
i.queue.AddRateLimited(key)
return
}

i.queue.Forget(key)
// Report that, even after several retries, we could not successfully process this key
logrus.Infof("Dropping key (%v) out of the queue: %v", key, err)
}
26 changes: 26 additions & 0 deletions pkg/sdk/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package types

import (
"context"

"k8s.io/apimachinery/pkg/runtime"
)

// Object is the Kubernetes runtime.Object interface expected
// of all resources that the user can watch.
type Object runtime.Object

// Event is triggered when some change has happened on the watched resources.
// If created or updated, Object would be the current state and Deleted=false.
// If deleted, Object would be the last known state and Deleted=true.
type Event struct {
Object Object
Deleted bool
}

// Context is the special context that is passed to the Handler.
// It includes:
// - Context: standard Go context that is used to pass cancellation signals and deadlines
type Context struct {
Context context.Context
}