Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 115 additions & 59 deletions pkg/blueprint/blueprint_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -87,6 +88,8 @@ type BaseBlueprintHandler struct {
shims *Shims

kustomizationWaitPollInterval time.Duration
kustomizationReconcileTimeout time.Duration
kustomizationReconcileSleep time.Duration
}

// NewBlueprintHandler creates a new instance of BaseBlueprintHandler.
Expand All @@ -96,6 +99,8 @@ func NewBlueprintHandler(injector di.Injector) *BaseBlueprintHandler {
injector: injector,
shims: NewShims(),
kustomizationWaitPollInterval: constants.DEFAULT_KUSTOMIZATION_WAIT_POLL_INTERVAL,
kustomizationReconcileTimeout: constants.DEFAULT_FLUX_KUSTOMIZATION_TIMEOUT,
kustomizationReconcileSleep: constants.DEFAULT_KUSTOMIZATION_WAIT_POLL_INTERVAL,
}
}

Expand Down Expand Up @@ -720,6 +725,7 @@ func (b *BaseBlueprintHandler) applyKustomization(kustomization blueprintv1alpha
Path: kustomization.Path,
Prune: constants.DEFAULT_FLUX_KUSTOMIZATION_PRUNE,
Wait: constants.DEFAULT_FLUX_KUSTOMIZATION_WAIT,
Suspend: false,
DependsOn: func() []meta.NamespacedObjectReference {
dependsOn := make([]meta.NamespacedObjectReference, len(kustomization.DependsOn))
for i, dep := range kustomization.DependsOn {
Expand Down Expand Up @@ -749,7 +755,6 @@ func (b *BaseBlueprintHandler) applyKustomization(kustomization blueprintv1alpha
},
}

// Ensure the status field is not included in the request body, it breaks the request
kustomizeObj.Status = kustomizev1.KustomizationStatus{}

config := ResourceOperationConfig{
Expand All @@ -758,37 +763,23 @@ func (b *BaseBlueprintHandler) applyKustomization(kustomization blueprintv1alpha
ResourceName: "kustomizations",
ResourceInstanceName: kustomizeObj.Name,
ResourceObject: kustomizeObj,
ResourceType: func() runtime.Object { return &kustomizev1.Kustomization{} },
ResourceType: func() runtime.Object {
return &kustomizev1.Kustomization{}
},
}

kubeconfig := os.Getenv("KUBECONFIG")
return kubeClientResourceOperation(kubeconfig, config)
return kubeClientResourceOperation(os.Getenv("KUBECONFIG"), config)
}

// deleteKustomization deletes a Kustomization resource from the cluster. The function
// sends a DELETE request to remove the kustomization and its associated resources.
func (b *BaseBlueprintHandler) deleteKustomization(name string, namespace string) error {
kubeconfig := os.Getenv("KUBECONFIG")
config := ResourceOperationConfig{
ApiPath: "/apis/kustomize.toolkit.fluxcd.io/v1",
Namespace: namespace,
ResourceName: "kustomizations",
ResourceInstanceName: name,
ResourceObject: nil,
ResourceType: func() runtime.Object { return &kustomizev1.Kustomization{} },
}
return b.deleteResource(kubeconfig, config)
}

// deleteResource deletes a resource from the cluster using the REST client. The function
// sends a DELETE request to the specified API path and resource.
func (b *BaseBlueprintHandler) deleteResource(kubeconfigPath string, config ResourceOperationConfig) error {
return kubeClient(kubeconfigPath, KubeRequestConfig{
return kubeClient(os.Getenv("KUBECONFIG"), KubeRequestConfig{
Method: "DELETE",
ApiPath: config.ApiPath,
Namespace: config.Namespace,
Resource: config.ResourceName,
Name: config.ResourceInstanceName,
ApiPath: "/apis/kustomize.toolkit.fluxcd.io/v1",
Namespace: namespace,
Resource: "kustomizations",
Name: name,
})
}

Expand Down Expand Up @@ -904,6 +895,7 @@ func (b *BaseBlueprintHandler) suspendKustomization(name, namespace string) erro
if err != nil {
return fmt.Errorf("failed to marshal patch: %w", err)
}

return kubeClient(os.Getenv("KUBECONFIG"), KubeRequestConfig{
Method: "PATCH",
ApiPath: "/apis/kustomize.toolkit.fluxcd.io/v1",
Expand Down Expand Up @@ -1149,7 +1141,6 @@ func (b *BaseBlueprintHandler) yamlMarshalWithDefinedPaths(v any) ([]byte, error
}

func (b *BaseBlueprintHandler) createManagedNamespace(name string) error {
kubeconfig := os.Getenv("KUBECONFIG")
ns := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand All @@ -1162,7 +1153,8 @@ func (b *BaseBlueprintHandler) createManagedNamespace(name string) error {
},
},
}
err := kubeClient(kubeconfig, KubeRequestConfig{

err := kubeClient(os.Getenv("KUBECONFIG"), KubeRequestConfig{
Method: "POST",
ApiPath: "/api/v1",
Resource: "namespaces",
Expand All @@ -1178,8 +1170,7 @@ func (b *BaseBlueprintHandler) createManagedNamespace(name string) error {
}

func (b *BaseBlueprintHandler) deleteNamespace(name string) error {
kubeconfig := os.Getenv("KUBECONFIG")
return kubeClient(kubeconfig, KubeRequestConfig{
return kubeClient(os.Getenv("KUBECONFIG"), KubeRequestConfig{
Method: "DELETE",
ApiPath: "/api/v1",
Resource: "namespaces",
Expand Down Expand Up @@ -1240,8 +1231,7 @@ func (b *BaseBlueprintHandler) applyGitRepository(source blueprintv1alpha1.Sourc
ResourceType: func() runtime.Object { return &sourcev1.GitRepository{} },
}

kubeconfig := os.Getenv("KUBECONFIG")
return kubeClientResourceOperation(kubeconfig, config)
return kubeClientResourceOperation(os.Getenv("KUBECONFIG"), config)
}

// applyConfigMap creates or updates a ConfigMap in the cluster containing context-specific
Expand Down Expand Up @@ -1296,8 +1286,7 @@ func (b *BaseBlueprintHandler) applyConfigMap() error {
},
}

kubeconfig := os.Getenv("KUBECONFIG")
return kubeClientResourceOperation(kubeconfig, config)
return kubeClientResourceOperation(os.Getenv("KUBECONFIG"), config)
}

// calculateMaxWaitTime calculates the maximum wait time needed based on kustomization dependencies.
Expand Down Expand Up @@ -1419,43 +1408,110 @@ var kubeClient = func(kubeconfigPath string, config KubeRequestConfig) error {
return fmt.Errorf("failed to create Kubernetes config: %w", err)
}

clientset, err := kubernetes.NewForConfig(kubeConfig)
dynamicClient, err := dynamic.NewForConfig(kubeConfig)
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
return fmt.Errorf("failed to create dynamic client: %w", err)
}

restClient := clientset.CoreV1().RESTClient()
backgroundCtx := ctx.Background()

req := restClient.Verb(config.Method).
AbsPath(config.ApiPath).
Resource(config.Resource)

if config.Namespace != "" {
req = req.Namespace(config.Namespace)
// Parse API path to get group, version, resource
parts := strings.Split(strings.TrimPrefix(config.ApiPath, "/"), "/")
if len(parts) < 2 {
return fmt.Errorf("invalid API path: %s", config.ApiPath)
}

if config.Name != "" {
req = req.Name(config.Name)
var gvr schema.GroupVersionResource
if parts[0] == "api" {
// Core API group
gvr = schema.GroupVersionResource{
Group: "",
Version: parts[1],
Resource: config.Resource,
}
} else if parts[0] == "apis" {
// Custom resource
if len(parts) < 3 {
return fmt.Errorf("invalid API path for custom resource: %s", config.ApiPath)
}
gvr = schema.GroupVersionResource{
Group: parts[1],
Version: parts[2],
Resource: config.Resource,
}
} else {
return fmt.Errorf("invalid API path format: %s", config.ApiPath)
}

if config.Body != nil {
req = req.Body(config.Body)
var resourceClient dynamic.ResourceInterface
if config.Namespace != "" {
resourceClient = dynamicClient.Resource(gvr).Namespace(config.Namespace)
} else {
resourceClient = dynamicClient.Resource(gvr)
}

if config.Headers != nil {
for key, value := range config.Headers {
req = req.SetHeader(key, value)
switch config.Method {
case "GET":
if config.Name == "" {
// List operation
list, err := resourceClient.List(ctx.Background(), metav1.ListOptions{})
if err != nil {
return err
}
if config.Response != nil {
return runtime.DefaultUnstructuredConverter.FromUnstructured(list.UnstructuredContent(), config.Response)
}
} else {
// Get operation
obj, err := resourceClient.Get(ctx.Background(), config.Name, metav1.GetOptions{})
if err != nil {
return err
}
if config.Response != nil {
return runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), config.Response)
}
}
}

result := req.Do(backgroundCtx)
if err := result.Error(); err != nil {
case "POST":
if config.Body == nil {
return fmt.Errorf("body required for POST request")
}
unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(config.Body)
if err != nil {
return fmt.Errorf("failed to convert object to unstructured: %w", err)
}
_, err = resourceClient.Create(ctx.Background(), &unstructured.Unstructured{Object: unstructuredObj}, metav1.CreateOptions{})
return err
}

if config.Response != nil {
return result.Into(config.Response)
case "PUT":
if config.Body == nil {
return fmt.Errorf("body required for PUT request")
}
unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(config.Body)
if err != nil {
return fmt.Errorf("failed to convert object to unstructured: %w", err)
}
_, err = resourceClient.Update(ctx.Background(), &unstructured.Unstructured{Object: unstructuredObj}, metav1.UpdateOptions{})
return err
case "DELETE":
if config.Name == "" {
return fmt.Errorf("name required for DELETE request")
}
return resourceClient.Delete(ctx.Background(), config.Name, metav1.DeleteOptions{})
case "PATCH":
if config.Name == "" || config.Body == nil {
return fmt.Errorf("name and body required for PATCH request")
}
patchBytes, ok := config.Body.([]byte)
if !ok {
return fmt.Errorf("body must be []byte for PATCH request")
}
_, err = resourceClient.Patch(
ctx.Background(),
config.Name,
types.MergePatchType,
patchBytes,
metav1.PatchOptions{},
)
return err
default:
return fmt.Errorf("unsupported method: %s", config.Method)
}

return nil
Expand Down
Loading
Loading