Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions pkg/sdk/action/action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package action

import (
"fmt"

"github.com/coreos/operator-sdk/pkg/k8sclient"
sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"
"github.com/coreos/operator-sdk/pkg/util/k8sutil"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
)

const (
// Supported function types
KubeApplyFunc sdkTypes.FuncType = iota
KubeDeleteFunc
)

var (
// kubeFuncs is the mapping of the supported functions
kubeFuncs = map[sdkTypes.FuncType]sdkTypes.KubeFunc{
KubeApplyFunc: KubeApply,
KubeDeleteFunc: KubeDelete,
}
)

// ProcessAction invokes the function specified by action.Func
func ProcessAction(action sdkTypes.Action) error {
kubeFunc, ok := kubeFuncs[action.Func]
if !ok {
return fmt.Errorf("failed to process action: unsupported function (%v)", action.Func)
}
err := kubeFunc(action.Object)
if err != nil {
return fmt.Errorf("failed to process action: %v", err)
}
return nil
}

// KubeApply tries to create the specified object or update it if it already exists
func KubeApply(object sdkTypes.Object) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("kube-apply failed: %v", err)
}
}()

name, namespace, err := getNameAndNamespace(object)
if err != nil {
return err
}
gvk := object.GetObjectKind().GroupVersionKind()
apiVersion, kind := gvk.ToAPIVersionAndKind()
objectInfo := objectInfoString(kind, name, namespace)

resourceClient, _, err := k8sclient.GetResourceClient(apiVersion, kind, namespace)
if err != nil {
return fmt.Errorf("failed to get resource client for object: %v", err)
}

unstructObj := k8sutil.UnstructuredFromRuntimeObject(object)

// Create the resource if it doesn't exist
_, err = resourceClient.Create(unstructObj)
if err == nil {
return nil
}
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create object (%s): %v ", objectInfo, err)
}

// Update it if it already exists
// NOTE: The update could fail if there is a resourceVersion conflict.
// That means the object is stale, and the user needs to retry the Action with
// an updated object that has the latest resourceVersion
_, err = resourceClient.Update(unstructObj)
if err != nil {
return fmt.Errorf("failed to update object (%s): %v ", objectInfo, err)
}
return nil
}

// KubeDelete deletes an object
func KubeDelete(object sdkTypes.Object) (err error) {
panic("TODO")
}

func getNameAndNamespace(object sdkTypes.Object) (string, string, error) {
accessor := meta.NewAccessor()
name, err := accessor.Name(object)
if err != nil {
return "", "", fmt.Errorf("failed to get name for object: %v", err)
}
namespace, err := accessor.Namespace(object)
if err != nil {
return "", "", fmt.Errorf("failed to get namespace for object: %v", err)
}
return name, namespace, nil
}

func objectInfoString(kind, name, namespace string) string {
return kind + ": " + namespace + "/" + name
}
20 changes: 16 additions & 4 deletions pkg/sdk/informer/sync.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package informer

import (
sdkAction "github.com/coreos/operator-sdk/pkg/sdk/action"
sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler"
sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types"
"github.com/coreos/operator-sdk/pkg/util/k8sutil"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

const (
Expand Down Expand Up @@ -47,7 +50,13 @@ func (i *informer) sync(key string) error {
if err != nil {
return err
}
object := obj.(sdkTypes.Object)
if !exists {
logrus.Infof("Object (%s) is deleted", key)
return nil
}

unstructObj := obj.(*unstructured.Unstructured).DeepCopy()
object := k8sutil.RuntimeObjectFromUnstructured(unstructObj)

event := sdkTypes.Event{
Object: object,
Expand All @@ -58,9 +67,12 @@ func (i *informer) sync(key string) error {
actions := sdkHandler.RegisteredHandler.Handle(sdkCtx, event)
// TODO: Add option to prevent multiple informers from invoking Handle() concurrently?

// TODO: process all actions for this event
actions = actions

for _, action := range actions {
err := sdkAction.ProcessAction(action)
if err != nil {
return err
}
}
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/sdk/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ type Context struct {
}

// FuncType defines the type of the function of an Action.
type FuncType string
type FuncType int

// Action defines what Func(tion) to apply on the given Object.
// KubeFunc is the function signature for supported kubernetes functions
type KubeFunc func(Object) error

// Action defines what Function to apply on a given Object.
type Action struct {
Object Object
Func FuncType
Expand Down
34 changes: 34 additions & 0 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package k8sutil

import (
"encoding/json"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)

// RuntimeObjectFromUnstructured converts an unstructured to a runtime object
func RuntimeObjectFromUnstructured(u *unstructured.Unstructured) runtime.Object {
b, err := json.Marshal(u.Object)
if err != nil {
panic(err)
}
var ro runtime.Object
if err := json.Unmarshal(b, ro); err != nil {
panic(err)
}
return ro
}

// UnstructuredFromRuntimeObject converts a runtime object to an unstructured
func UnstructuredFromRuntimeObject(ro runtime.Object) *unstructured.Unstructured {
b, err := json.Marshal(ro)
if err != nil {
panic(err)
}
var u unstructured.Unstructured
if err := json.Unmarshal(b, &u.Object); err != nil {
panic(err)
}
return &u
}