Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions pkg/sdk/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package sdk

import (
"context"
"fmt"

"github.com/coreos/operator-sdk/pkg/sdk/dispatcher"
sdkInformer "github.com/coreos/operator-sdk/pkg/sdk/informer"
sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)

type SDK struct {
// informers is the set of all informers registered by the user
informers map[string]sdkInformer.Informer
handler sdkTypes.Handler
}

// NewKubeInformer returns an Informer for the specified resourceType
// - resourceName is the plural name of the resource kind, e.g: configmaps, secrets
// - namespace is the namespace to watch for that resource
// - objType is the runtime object to infer the type, e.g: &v1.ConfigMap{}
// - resourceClient is the REST client used to list and watch the resource
func NewKubeInformer(resourceName, namespace string, objType runtime.Object, resourceClient cache.Getter) sdkInformer.Informer {
return sdkInformer.New(resourceName, namespace, objType, resourceClient)
}

// RegisterInformer registers an SDK informer under the specified name
func (sdk *SDK) RegisterInformer(informerName string, informer sdkInformer.Informer) error {
if _, ok := sdk.informers[informerName]; ok {
return fmt.Errorf("informer (%v) is already registered", informerName)
}
sdk.informers[informerName] = informer
return nil
}

func (sdk *SDK) RegisterHandler(handler sdkTypes.Handler) {
sdk.handler = handler
}

func (sdk *SDK) Run(ctx context.Context) {
// Run all informers and get the event channels
var eventChans []<-chan *sdkTypes.Event
for _, informer := range sdk.informers {
evc, err := informer.Run(ctx)
if err != nil {
panic("TODO")
}
eventChans = append(eventChans, evc)
}

// Create a new dispatcher to pass events to the registered handler
dp := dispatcher.New(eventChans, sdk.handler)
dp.Run(ctx)
}

/*
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this into doc.go or create example_test.go

```main.go
func main() {
sdk.RegisterInformer("play-informer", informer.NewKubeInformer(&Play{}))
sdk.RegisterInformer("pod-informer", informer.NewKubeInformer(&Pod{}))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just sdk.watch("watch-pod", &Pod{})?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be two kinds of states to watch: k8s state and application state.
We need to provide an abstraction to extend inform events mechanism.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiang90 if I understand how informer works. it requires the restClient that's associated with the watched resource.
e.g:

	source := cache.NewListWatchFromClient(
		v.vaultsCRCli.VaultV1alpha1().RESTClient(),
		api.VaultServicePlural,
		v.namespace,
		fields.Everything())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do not quite understand. can you provide me an example?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, i am thinking about a different issue.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should hide all the inform stuff for users if possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do not quite understand. can you provide me an example?

Sure.
For example, we might have an informer that watches application API and triggers custom events, e.g. application is unhealthy or overloaded, which cannot be observed via k8s api.

we should hide all the inform stuff for users if possible.

Yes. We will provide a KubeInformer() which is the same mechanism you mentioned above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiang90 I am thinking about this watch interface:

type Watchable struct {
	RESTClient rest.Interface
	Resource   string
	Namespace  string
}

type Client interface {
	Watcher(...Watchable) Watcher
}

But the Watchable is not elegant since it needs to know the RESTClient that's associating with the watched resource.

I thought about sdk.watch("watch-pod", &Pod{})?. I am not sure how to create informer internally just with the &Pod{} struct.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, we might have an informer that watches application API and triggers custom events, e.g. application is unhealthy or overloaded, which cannot be observed via k8s api.

OK. let us worry about this later by creating a new func. WatchApplication or whatever. I do not want to overload one func.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fanminshi

watch(whatever, withNamespace(x), withFields(y)...). the options work exactly like what etcd/cleintv3 does and other projects in go do. that is the suggested way for go.


sdk.RegisterActor("kube-apply", actor.KubeResourceApply)
sdk.RegisterActor("kube-delete", actor.KubeResourceDelete)

sdk.RegisterHandle(stub.NewHandler())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove Register. use handleFunc or Serve. see examples here: https://golang.org/pkg/net/http/#HandleFunc

sdk.Run(ctx)
}
*/
24 changes: 24 additions & 0 deletions pkg/sdk/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package dispatcher

import (
"context"

sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"
)

type Dispatcher interface {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i hope to see this looks like https://golang.org/pkg/net/http/#ServeMux

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandleMux

Run(ctx context.Context) error
}

type dispatcher struct {
eventChans []<-chan *sdkTypes.Event
}

func New(eventChans []<-chan *sdkTypes.Event, handler sdkTypes.Handler) *dispatcher {
panic("TODO")
}

// Run runs the dispatcher which collects events from all the event channels and sends it to the handler
func (d *dispatcher) Run(ctx context.Context) error {
panic("TODO")
}
95 changes: 95 additions & 0 deletions pkg/sdk/informer/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package informer

import (
"context"
"errors"
"time"

sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

type Informer interface {
Run(ctx context.Context) (<-chan *sdkTypes.Event, error)
}

type informer struct {
resourceName string
sharedIndexInformer cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
kubeClient kubernetes.Interface
namespace string
eventChan chan *sdkTypes.Event
}

func New(resourceName, namespace string, objType runtime.Object, resourceClient cache.Getter) Informer {
i := &informer{
resourceName: resourceName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourceName),
kubeClient: nil,
namespace: namespace,
eventChan: make(chan *sdkTypes.Event, 100),
}

i.sharedIndexInformer = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(resourceClient, resourceName, namespace, fields.Everything()),
objType, 0, cache.Indexers{},
)
i.sharedIndexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: i.handleAddResourceEvent,
DeleteFunc: i.handleDeleteResourceEvent,
UpdateFunc: i.handleUpdateResourceEvent,
})
return i
}

func (i *informer) Run(ctx context.Context) (<-chan *sdkTypes.Event, error) {
defer i.queue.ShutDown()

logrus.Info("starting %s controller", i.resourceName)
go i.sharedIndexInformer.Run(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), i.sharedIndexInformer.HasSynced) {
return nil, errors.New("Timed out waiting for caches to sync")

}

const numWorkers = 1
for n := 0; n < numWorkers; n++ {
go wait.Until(i.runWorker, time.Second, ctx.Done())
}

return i.eventChan, nil
}

func (i *informer) handleAddResourceEvent(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
panic(err)
}
i.queue.Add(key)
}

func (i *informer) handleDeleteResourceEvent(obj interface{}) {
// For deletes we have to use this key function
// to handle the DeletedFinalStateUnknown case for the object
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
panic(err)
}
i.queue.Add(key)
}

func (i *informer) handleUpdateResourceEvent(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
panic(err)
}
i.queue.Add(key)
}
81 changes: 81 additions & 0 deletions pkg/sdk/informer/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package informer

import (
"github.com/sirupsen/logrus"

sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"
)

const (
// Copy from deployment_controller.go:
// maxRetries is the number of times a Vault will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
// a Vault is going to be requeued:
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)

func (i *informer) runWorker() {
for i.processNextItem() {
}
}

func (i *informer) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := i.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer i.queue.Done(key)

// Invoke the method containing the business logic
err := i.sync(key.(string))

// Handle the error if something went wrong during the execution of the business logic
i.handleErr(err, key)
return true
}

// sync wraps the object into an event and sends it on the event channel for the controller
func (i *informer) sync(key string) error {
obj, exists, err := i.sharedIndexInformer.GetIndexer().GetByKey(key)
if err != nil {
return err
}

ev := &sdkTypes.Event{
Object: obj,
ObjectExist: exists,
}
i.eventChan <- ev
return nil
}

// handleErr checks if an error happened and makes sure we will retry later.
func (i *informer) handleErr(err error, key interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
i.queue.Forget(key)
return
}

// This controller retries maxRetries times if something goes wrong. After that, it stops trying.
if i.queue.NumRequeues(key) < maxRetries {
logrus.Errorf("error syncing key (%v): %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
i.queue.AddRateLimited(key)
return
}

i.queue.Forget(key)
// Report that, even after several retries, we could not successfully process this key
logrus.Infof("Dropping key (%v) out of the queue: %v", key, err)
}
19 changes: 19 additions & 0 deletions pkg/sdk/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package types

import "context"

type Object interface{}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be k8s runtime.object?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us not worry about user defined event object for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.


type Event struct {
Object Object
ObjectExist bool
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also be action?

}

type Action struct {
Object Object
Actor string
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be just action (apply, delete, create, patch etc..). also type define the string.

}

type Handler interface {
Handle(ctx context.Context, events []Event) []Action
}
16 changes: 16 additions & 0 deletions pkg/stub/handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package stub

import (
"context"

sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"
)

type handler struct {
// custom data structure
}

func (h *handler) Handle(ctx context.Context, events []sdkTypes.Event) []sdkTypes.Action {
// Filled out by user
return nil
}