Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pkg/sdk/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package sdk
import (
"context"

sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler"
sdkInformer "github.com/coreos/operator-sdk/pkg/sdk/informer"
sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,13 +27,16 @@ func Watch(resourcePluralName, namespace string, obj runtime.Object, resourceCli
informers = append(informers, informer)
}

// TODO: func Handle(handler sdkTypes.Handler)
// Handle registers the handler for all events.
// In the future, we would have a mux-pattern to dispatch events to matched handlers.
func Handle(handler sdkHandler.Handler) {
sdkHandler.RegisteredHandler = handler
}

// Run starts the process of Watching resources, handling Events, and processing Actions
func Run(ctx context.Context) {
sdkCtx := sdkTypes.Context{Context: ctx}
for _, informer := range informers {
err := informer.Run(sdkCtx)
err := informer.Run(ctx)
if err != nil {
logrus.Errorf("failed to run informer: %v", err)
return
Expand Down
17 changes: 17 additions & 0 deletions pkg/sdk/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package handler

import (
sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"
)

// Handler reacts to events and outputs actions.
// If any intended action failed, the event would be re-triggered.
// For actions done before the failed action, there is no rollback.
type Handler interface {
Handle(sdkTypes.Context, sdkTypes.Event) []sdkTypes.Action
}

var (
// RegisteredHandler is the user registered handler set by sdk.Handle()
RegisteredHandler Handler
)
15 changes: 8 additions & 7 deletions pkg/sdk/informer/informer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package informer

import (
"context"
"errors"
"time"

sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -17,7 +16,7 @@ import (
)

type Informer interface {
Run(ctx sdkTypes.Context) error
Run(ctx context.Context) error
}

type informer struct {
Expand All @@ -26,6 +25,7 @@ type informer struct {
queue workqueue.RateLimitingInterface
kubeClient kubernetes.Interface
namespace string
context context.Context
}

func New(resourcePluralName, namespace string, objType runtime.Object, resourceClient rest.Interface) Informer {
Expand All @@ -49,19 +49,20 @@ func New(resourcePluralName, namespace string, objType runtime.Object, resourceC
return i
}

func (i *informer) Run(ctx sdkTypes.Context) error {
func (i *informer) Run(ctx context.Context) error {
i.context = ctx
defer i.queue.ShutDown()

logrus.Info("starting %s controller", i.resourcePluralName)
go i.sharedIndexInformer.Run(ctx.Context.Done())
go i.sharedIndexInformer.Run(ctx.Done())

if !cache.WaitForCacheSync(ctx.Context.Done(), i.sharedIndexInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), i.sharedIndexInformer.HasSynced) {
return errors.New("Timed out waiting for caches to sync")
}

const numWorkers = 1
for n := 0; n < numWorkers; n++ {
go wait.Until(i.runWorker, time.Second, ctx.Context.Done())
go wait.Until(i.runWorker, time.Second, ctx.Done())
}
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sdk/informer/sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package informer

import (
sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler"
sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -53,7 +54,8 @@ func (i *informer) sync(key string) error {
Deleted: !exists,
}

// TODO: call registered handler for the event
sdkCtx := sdkTypes.Context{Context: i.context}
actions := sdkHandler.RegisteredHandler.Handle(sdkCtx, event)
// TODO: Add option to prevent multiple informers from invoking Handle() concurrently?

// TODO: process all actions for this event
Expand Down
9 changes: 9 additions & 0 deletions pkg/sdk/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,12 @@ type Event struct {
type Context struct {
Context context.Context
}

// FuncType defines the type of the function of an Action.
type FuncType string

// Action defines what Func(tion) to apply on the given Object.
type Action struct {
Object Object
Func FuncType
}