diff --git a/pkg/sdk/action/action.go b/pkg/sdk/action/action.go new file mode 100644 index 0000000000..b7569f7332 --- /dev/null +++ b/pkg/sdk/action/action.go @@ -0,0 +1,104 @@ +package action + +import ( + "fmt" + + "github.com/coreos/operator-sdk/pkg/k8sclient" + sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + "github.com/coreos/operator-sdk/pkg/util/k8sutil" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" +) + +const ( + // Supported function types + KubeApplyFunc sdkTypes.FuncType = iota + KubeDeleteFunc +) + +var ( + // kubeFuncs is the mapping of the supported functions + kubeFuncs = map[sdkTypes.FuncType]sdkTypes.KubeFunc{ + KubeApplyFunc: KubeApply, + KubeDeleteFunc: KubeDelete, + } +) + +// ProcessAction invokes the function specified by action.Func +func ProcessAction(action sdkTypes.Action) error { + kubeFunc, ok := kubeFuncs[action.Func] + if !ok { + return fmt.Errorf("failed to process action: unsupported function (%v)", action.Func) + } + err := kubeFunc(action.Object) + if err != nil { + return fmt.Errorf("failed to process action: %v", err) + } + return nil +} + +// KubeApply tries to create the specified object or update it if it already exists +func KubeApply(object sdkTypes.Object) (err error) { + defer func() { + if err != nil { + err = fmt.Errorf("kube-apply failed: %v", err) + } + }() + + name, namespace, err := getNameAndNamespace(object) + if err != nil { + return err + } + gvk := object.GetObjectKind().GroupVersionKind() + apiVersion, kind := gvk.ToAPIVersionAndKind() + objectInfo := objectInfoString(kind, name, namespace) + + resourceClient, _, err := k8sclient.GetResourceClient(apiVersion, kind, namespace) + if err != nil { + return fmt.Errorf("failed to get resource client for object: %v", err) + } + + unstructObj := k8sutil.UnstructuredFromRuntimeObject(object) + + // Create the resource if it doesn't exist + _, err = resourceClient.Create(unstructObj) + if err == nil { + return nil + } + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create object (%s): %v ", objectInfo, err) + } + + // Update it if it already exists + // NOTE: The update could fail if there is a resourceVersion conflict. + // That means the object is stale, and the user needs to retry the Action with + // an updated object that has the latest resourceVersion + _, err = resourceClient.Update(unstructObj) + if err != nil { + return fmt.Errorf("failed to update object (%s): %v ", objectInfo, err) + } + return nil +} + +// KubeDelete deletes an object +func KubeDelete(object sdkTypes.Object) (err error) { + panic("TODO") +} + +func getNameAndNamespace(object sdkTypes.Object) (string, string, error) { + accessor := meta.NewAccessor() + name, err := accessor.Name(object) + if err != nil { + return "", "", fmt.Errorf("failed to get name for object: %v", err) + } + namespace, err := accessor.Namespace(object) + if err != nil { + return "", "", fmt.Errorf("failed to get namespace for object: %v", err) + } + return name, namespace, nil +} + +func objectInfoString(kind, name, namespace string) string { + return kind + ": " + namespace + "/" + name +} diff --git a/pkg/sdk/informer/sync.go b/pkg/sdk/informer/sync.go index 442241ac06..2405a26e6b 100644 --- a/pkg/sdk/informer/sync.go +++ b/pkg/sdk/informer/sync.go @@ -1,10 +1,13 @@ package informer import ( + sdkAction "github.com/coreos/operator-sdk/pkg/sdk/action" sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler" sdkTypes "github.com/coreos/operator-sdk/pkg/sdk/types" + "github.com/coreos/operator-sdk/pkg/util/k8sutil" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) const ( @@ -47,7 +50,13 @@ func (i *informer) sync(key string) error { if err != nil { return err } - object := obj.(sdkTypes.Object) + if !exists { + logrus.Infof("Object (%s) is deleted", key) + return nil + } + + unstructObj := obj.(*unstructured.Unstructured).DeepCopy() + object := k8sutil.RuntimeObjectFromUnstructured(unstructObj) event := sdkTypes.Event{ Object: object, @@ -58,9 +67,12 @@ func (i *informer) sync(key string) error { actions := sdkHandler.RegisteredHandler.Handle(sdkCtx, event) // TODO: Add option to prevent multiple informers from invoking Handle() concurrently? - // TODO: process all actions for this event - actions = actions - + for _, action := range actions { + err := sdkAction.ProcessAction(action) + if err != nil { + return err + } + } return nil } diff --git a/pkg/sdk/types/types.go b/pkg/sdk/types/types.go index f188638635..6cf8088983 100644 --- a/pkg/sdk/types/types.go +++ b/pkg/sdk/types/types.go @@ -26,9 +26,12 @@ type Context struct { } // FuncType defines the type of the function of an Action. -type FuncType string +type FuncType int -// Action defines what Func(tion) to apply on the given Object. +// KubeFunc is the function signature for supported kubernetes functions +type KubeFunc func(Object) error + +// Action defines what Function to apply on a given Object. type Action struct { Object Object Func FuncType diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go new file mode 100644 index 0000000000..8d152487f1 --- /dev/null +++ b/pkg/util/k8sutil/k8sutil.go @@ -0,0 +1,34 @@ +package k8sutil + +import ( + "encoding/json" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +// RuntimeObjectFromUnstructured converts an unstructured to a runtime object +func RuntimeObjectFromUnstructured(u *unstructured.Unstructured) runtime.Object { + b, err := json.Marshal(u.Object) + if err != nil { + panic(err) + } + var ro runtime.Object + if err := json.Unmarshal(b, ro); err != nil { + panic(err) + } + return ro +} + +// UnstructuredFromRuntimeObject converts a runtime object to an unstructured +func UnstructuredFromRuntimeObject(ro runtime.Object) *unstructured.Unstructured { + b, err := json.Marshal(ro) + if err != nil { + panic(err) + } + var u unstructured.Unstructured + if err := json.Unmarshal(b, &u.Object); err != nil { + panic(err) + } + return &u +}