diff --git a/pkg/sdk/action/action.go b/pkg/sdk/action/action.go new file mode 100644 index 0000000000..e538dffb88 --- /dev/null +++ b/pkg/sdk/action/action.go @@ -0,0 +1,141 @@ +package action + +import ( + "fmt" + + "github.com/coreos/operator-sdk/pkg/k8sclient" + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + "github.com/coreos/operator-sdk/pkg/util/k8sutil" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// FuncName defines the name of the function of an Action. +type FuncName string + +// FuncType is the function signature for supported kubernetes functions +type FuncType func(sdkTypes.Object) error + +// Action defines what Function to apply on a given Object. +type Action struct { + Object sdkTypes.Object + Func FuncName +} + +const ( + // Function names for supported functions + KubeApplyFunc FuncName = "kube-apply" + KubeDeleteFunc FuncName = "kube-delete" +) + +var ( + // kubeFuncs is the mapping of the supported functions + kubeFuncs = map[FuncName]FuncType{ + KubeApplyFunc: KubeApply, + KubeDeleteFunc: KubeDelete, + } +) + +// ProcessAction invokes the function specified by action.Func +func ProcessAction(action Action) error { + kubeFunc, ok := kubeFuncs[action.Func] + if !ok { + return fmt.Errorf("failed to process action: unsupported function (%v)", action.Func) + } + err := kubeFunc(action.Object) + if err != nil { + return fmt.Errorf("failed to process action: %v", err) + } + return nil +} + +// KubeApply will try to create the specified object or update it if it already exists +func KubeApply(object sdkTypes.Object) (err error) { + defer func() { + if err != nil { + err = fmt.Errorf("kube-apply failed: %v", err) + } + }() + + name, namespace, err := getNameAndNamespace(object) + if err != nil { + return err + } + gvk := object.GetObjectKind().GroupVersionKind() + apiVersion, kind := gvk.ToAPIVersionAndKind() + objectInfo := objectInfoString(kind, name, namespace) + + resourceClient, _, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) + if err != nil { + return fmt.Errorf("failed to get resource client for object: %v", err) + } + + unstructObj, err := k8sutil.UnstructuredFromRuntimeObject(object) + if err != nil { + return fmt.Errorf("failed to get runtime object from unstructured: %v", err) + } + + // Create the resource if it doesn't exist + _, err = resourceClient.Create(unstructObj) + if err == nil { + return nil + } + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create object (%s): %v ", objectInfo, err) + } + + // Update it if it already exists + // TODO: Should resourceVersion conflict be handled differently + _, err = resourceClient.Update(unstructObj) + if err != nil { + return fmt.Errorf("failed to update object (%s): %v ", objectInfo, err) + } + return nil +} + +// KubeDelete deletes an object +func KubeDelete(object sdkTypes.Object) (err error) { + defer func() { + if err != nil { + err = fmt.Errorf("kube-delete failed: %v", err) + } + }() + + name, namespace, err := getNameAndNamespace(object) + if err != nil { + return err + } + gvk := object.GetObjectKind().GroupVersionKind() + apiVersion, kind := gvk.ToAPIVersionAndKind() + objectInfo := objectInfoString(kind, name, namespace) + + resourceClient, _, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) + if err != nil { + return fmt.Errorf("failed to get resource client for object: %v", err) + } + + err = resourceClient.Delete(name, &metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("failed to delete object (%s): %v", objectInfo, err) + } + return nil +} + +func getNameAndNamespace(object sdkTypes.Object) (string, string, error) { + accessor := meta.NewAccessor() + name, err := accessor.Name(object) + if err != nil { + return "", "", fmt.Errorf("failed to get name for object: %v", err) + } + namespace, err := accessor.Namespace(object) + if err != nil { + return "", "", fmt.Errorf("failed to get namespace for object: %v", err) + } + return name, namespace, nil +} + +func objectInfoString(kind, name, namespace string) string { + return kind + ": " + namespace + "/" + name +} diff --git a/pkg/sdk/handler/handler.go b/pkg/sdk/handler/handler.go index 6eef6b03f1..ff3f03d31d 100644 --- a/pkg/sdk/handler/handler.go +++ b/pkg/sdk/handler/handler.go @@ -1,6 +1,7 @@ package handler import ( + sdkAction "github.com/coreos/operator-sdk/pkg/sdk/action" sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" ) @@ -8,7 +9,7 @@ import ( // If any intended action failed, the event would be re-triggered. // For actions done before the failed action, there is no rollback. type Handler interface { - Handle(sdkTypes.Context, sdkTypes.Event) []sdkTypes.Action + Handle(sdkTypes.Context, sdkTypes.Event) []sdkAction.Action } var ( diff --git a/pkg/sdk/informer/sync.go b/pkg/sdk/informer/sync.go index 442241ac06..fedfb1f016 100644 --- a/pkg/sdk/informer/sync.go +++ b/pkg/sdk/informer/sync.go @@ -1,10 +1,13 @@ package informer import ( + sdkAction "github.com/coreos/operator-sdk/pkg/sdk/action" sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler" sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + "github.com/coreos/operator-sdk/pkg/util/k8sutil" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) const ( @@ -47,7 +50,17 @@ func (i *informer) sync(key string) error { if err != nil { return err } - object := obj.(sdkTypes.Object) + if !exists { + logrus.Infof("Object (%s) is deleted", key) + return nil + } + + unstructObj := obj.(*unstructured.Unstructured).DeepCopy() + object, err := k8sutil.RuntimeObjectFromUnstructured(unstructObj) + if err != nil { + logrus.Errorf("failed to get runtime object from unstructured: %v", err) + panic(err) + } event := sdkTypes.Event{ Object: object, @@ -58,9 +71,14 @@ func (i *informer) sync(key string) error { actions := sdkHandler.RegisteredHandler.Handle(sdkCtx, event) // TODO: Add option to prevent multiple informers from invoking Handle() concurrently? - // TODO: process all actions for this event - actions = actions - + for _, action := range actions { + err := sdkAction.ProcessAction(action) + if err != nil { + logrus.Infof("Processing action (%s) for object(%s)", action.Func, key) + // TODO: how to handle action failure + return err + } + } return nil } diff --git a/pkg/sdk/types/types.go b/pkg/sdk/types/types.go index f188638635..d5c88a1d43 100644 --- a/pkg/sdk/types/types.go +++ b/pkg/sdk/types/types.go @@ -24,12 +24,3 @@ type Event struct { type Context struct { Context context.Context } - -// FuncType defines the type of the function of an Action. -type FuncType string - -// Action defines what Func(tion) to apply on the given Object. -type Action struct { - Object Object - Func FuncType -} diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go new file mode 100644 index 0000000000..d5223a78f0 --- /dev/null +++ b/pkg/util/k8sutil/k8sutil.go @@ -0,0 +1,34 @@ +package k8sutil + +import ( + "encoding/json" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +// RuntimeObjectFromUnstructured unmarshals a runtime.Object from dynamic client's unstructured +func RuntimeObjectFromUnstructured(u *unstructured.Unstructured) (runtime.Object, error) { + b, err := json.Marshal(u.Object) + if err != nil { + return nil, err + } + var ro runtime.Object + if err := json.Unmarshal(b, ro); err != nil { + return nil, err + } + return ro, nil +} + +// UnstructuredFromRuntimeObject unmarshals a runtime.Object from dynamic client's unstructured +func UnstructuredFromRuntimeObject(ro runtime.Object) (*unstructured.Unstructured, error) { + b, err := json.Marshal(ro) + if err != nil { + return nil, err + } + var u unstructured.Unstructured + if err := json.Unmarshal(b, &u.Object); err != nil { + return nil, err + } + return &u, nil +}