Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false)
if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot))
return ctrl.Result{}, controller.NewAPIServerError(err)
return ctrl.Result{}, controller.NewAPIServerError(false, err)
}
}
if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) == policyHash {
Expand Down Expand Up @@ -89,11 +89,15 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster

if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", policySnapshotKObj)
return ctrl.Result{}, controller.NewAPIServerError(err)
return ctrl.Result{}, controller.NewAPIServerError(false, err)
}
}

// create clusterResourceSnapshot
// TODO
if _, err := r.selectResourcesForPlacement(crp); err != nil {
return ctrl.Result{}, err
}
// update the status based on the latestPolicySnapshot status
// update the status based on the work
return ctrl.Result{}, nil
Expand Down Expand Up @@ -132,7 +136,7 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv
}
if err := r.Client.Update(ctx, latest); err != nil {
klog.ErrorS(err, "Failed to update the clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latest))
return controller.NewAPIServerError(err)
return controller.NewAPIServerError(false, err)
}
return nil
}
Expand All @@ -154,7 +158,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil {
klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(err)
return nil, -1, controller.NewAPIServerError(false, err)
}
if len(snapshotList.Items) == 1 {
policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0])
Expand All @@ -172,7 +176,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp
// When there are no active snapshots, find the one who has the largest policy index.
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(err)
return nil, -1, controller.NewAPIServerError(false, err)
}
if len(snapshotList.Items) == 0 {
// The policy index of the first snapshot will start from 0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (r *Reconciler) ReconcileV1Alpha1(ctx context.Context, key controller.Queue
klog.V(2).InfoS("Successfully selected clusters", "placement", placementOld.Name, "number of clusters", len(selectedClusters))

// select the new resources and record the result in the placementNew status
manifests, scheduleErr := r.selectResources(ctx, placementNew)
manifests, scheduleErr := r.selectResources(placementNew)
if scheduleErr != nil {
klog.ErrorS(scheduleErr, "failed to select the resources for this placement", "placement", placeRef)
r.updatePlacementScheduledCondition(placementOld, scheduleErr)
Expand Down
121 changes: 87 additions & 34 deletions pkg/controllers/clusterresourceplacement/resource_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Licensed under the MIT license.
package clusterresourceplacement

import (
"context"
"fmt"
"sort"
"strings"
Expand All @@ -22,14 +21,16 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
)

// selectResources selects the resources according to the placement resourceSelectors.
// It also generates an array of manifests obj based on the selected resources.
func (r *Reconciler) selectResources(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement) ([]workv1alpha1.Manifest, error) {
selectedObjects, err := r.gatherSelectedResource(ctx, placement)
func (r *Reconciler) selectResources(placement *fleetv1alpha1.ClusterResourcePlacement) ([]workv1alpha1.Manifest, error) {
selectedObjects, err := r.gatherSelectedResource(placement.GetName(), convertResourceSelector(placement.Spec.ResourceSelectors))
if err != nil {
return nil, err
}
Expand All @@ -56,10 +57,21 @@ func (r *Reconciler) selectResources(ctx context.Context, placement *fleetv1alph
return manifests, nil
}

// getSelectedResource get all the resources according to the resource selector.
func (r *Reconciler) gatherSelectedResource(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement) ([]runtime.Object, error) {
// Note: temporary solution to share the same set of utils between v1alpha1 and v1beta1 APIs so that v1alpha1 implementation
// won't be broken. v1alpha1 implementation should be removed when new API is ready.
// The clusterResourceSelect has no changes between different versions.
func convertResourceSelector(old []fleetv1alpha1.ClusterResourceSelector) []fleetv1beta1.ClusterResourceSelector {
res := make([]fleetv1beta1.ClusterResourceSelector, len(old))
for i, item := range old {
res[i] = fleetv1beta1.ClusterResourceSelector(item)
}
return res
}

// gatherSelectedResource gets all the resources according to the resource selector.
func (r *Reconciler) gatherSelectedResource(placement string, selectors []fleetv1beta1.ClusterResourceSelector) ([]runtime.Object, error) {
var resources []runtime.Object
for _, selector := range placement.Spec.ResourceSelectors {
for _, selector := range selectors {
gvk := schema.GroupVersionKind{
Group: selector.Group,
Version: selector.Version,
Expand All @@ -73,12 +85,12 @@ func (r *Reconciler) gatherSelectedResource(ctx context.Context, placement *flee
var objs []runtime.Object
var err error
if gvk == utils.NamespaceGVK {
objs, err = r.fetchNamespaceResources(ctx, selector, placement.GetName())
objs, err = r.fetchNamespaceResources(selector, placement)
} else {
objs, err = r.fetchClusterScopedResources(ctx, selector, placement.GetName())
objs, err = r.fetchClusterScopedResources(selector, placement)
}
if err != nil {
return nil, fmt.Errorf("selector = %v: %w", selector, err)
return nil, err
}
resources = append(resources, objs...)
}
Expand All @@ -102,16 +114,16 @@ func (r *Reconciler) gatherSelectedResource(ctx context.Context, placement *flee
return resources, nil
}

// fetchClusterScopedResources retrieve the objects based on the selector.
func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fleetv1alpha1.ClusterResourceSelector, placeName string) ([]runtime.Object, error) {
// fetchClusterScopedResources retrieves the objects based on the selector.
func (r *Reconciler) fetchClusterScopedResources(selector fleetv1beta1.ClusterResourceSelector, placeName string) ([]runtime.Object, error) {
klog.V(2).InfoS("start to fetch the cluster scoped resources by the selector", "selector", selector)
gk := schema.GroupKind{
Group: selector.Group,
Kind: selector.Kind,
}
restMapping, err := r.RestMapper.RESTMapping(gk, selector.Version)
if err != nil {
return nil, fmt.Errorf("failed to get GVR of the selector: %w", err)
return nil, controller.NewUserError(fmt.Errorf("failed to get GVR of the selector: %w", err))
}
gvr := restMapping.Resource
gvk := schema.GroupVersionKind{
Expand All @@ -120,10 +132,10 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle
Kind: selector.Kind,
}
if !r.InformerManager.IsClusterScopedResources(gvk) {
return nil, fmt.Errorf("%+v is not a cluster scoped resource", restMapping.Resource)
return nil, controller.NewUserError(fmt.Errorf("%+v is not a cluster scoped resource", restMapping.Resource))
}
if !r.InformerManager.IsInformerSynced(gvr) {
return nil, fmt.Errorf("informer cache for %+v is not synced yet", restMapping.Resource)
return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", restMapping.Resource))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of my confusion specified in an earlier comment. It could be, at this point, that the cache is simply not working fast enough and will get sync'd soon; or it could also be that there is a connection/RBAC issue that will never recover without intervention and needs attention.

}

lister := r.InformerManager.Lister(gvr)
Expand All @@ -132,7 +144,7 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle
obj, err := lister.Get(selector.Name)
if err != nil {
klog.ErrorS(err, "cannot get the resource", "gvr", gvr, "name", selector.Name)
return nil, client.IgnoreNotFound(err)
return nil, controller.NewAPIServerError(true, client.IgnoreNotFound(err))
}
uObj := obj.DeepCopyObject().(*unstructured.Unstructured)
if uObj.GetDeletionTimestamp() != nil {
Expand All @@ -151,13 +163,13 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle
// TODO: validator should enforce the validity of the labelSelector
labelSelector, err = metav1.LabelSelectorAsSelector(selector.LabelSelector)
if err != nil {
return nil, fmt.Errorf("cannot convert the label selector to a selector: %w", err)
return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot convert the label selector to a selector: %w", err))
}
}
var selectedObjs []runtime.Object
objects, err := lister.List(labelSelector)
if err != nil {
return nil, fmt.Errorf("cannot list all the objets: %w", err)
return nil, controller.NewAPIServerError(true, fmt.Errorf("cannot list all the objets: %w", err))
}
// go ahead and claim all objects by adding a finalizer and insert the placement in its annotation
for i := 0; i < len(objects); i++ {
Expand All @@ -174,14 +186,14 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle
return selectedObjs, nil
}

// fetchNamespaceResources retrieve all the objects for a ClusterResourceSelector that is for namespace.
func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleetv1alpha1.ClusterResourceSelector, placeName string) ([]runtime.Object, error) {
// fetchNamespaceResources retrieves all the objects for a ClusterResourceSelector that is for namespace.
func (r *Reconciler) fetchNamespaceResources(selector fleetv1beta1.ClusterResourceSelector, placeName string) ([]runtime.Object, error) {
klog.V(2).InfoS("start to fetch the namespace resources by the selector", "selector", selector)
var resources []runtime.Object

if len(selector.Name) != 0 {
// just a single namespace
objs, err := r.fetchAllResourcesInOneNamespace(ctx, selector.Name, placeName)
objs, err := r.fetchAllResourcesInOneNamespace(selector.Name, placeName)
if err != nil {
klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", selector.Name)
return nil, err
Expand All @@ -197,20 +209,20 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet
} else {
labelSelector, err = metav1.LabelSelectorAsSelector(selector.LabelSelector)
if err != nil {
return nil, fmt.Errorf("cannot convert the label selector to a selector: %w", err)
return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot convert the label selector to a selector: %w", err))
}
}
namespaces, err := r.InformerManager.Lister(utils.NamespaceGVR).List(labelSelector)
if err != nil {
return nil, fmt.Errorf("cannot list all the namespaces given the label selector: %w", err)
return nil, controller.NewAPIServerError(true, fmt.Errorf("cannot list all the namespaces given the label selector: %w", err))
}

for _, namespace := range namespaces {
ns, err := meta.Accessor(namespace)
if err != nil {
return nil, fmt.Errorf("cannot get the name of a namespace object: %w", err)
return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot get the name of a namespace object: %w", err))
}
objs, err := r.fetchAllResourcesInOneNamespace(ctx, ns.GetName(), placeName)
objs, err := r.fetchAllResourcesInOneNamespace(ns.GetName(), placeName)
if err != nil {
klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", ns.GetName())
return nil, err
Expand All @@ -220,20 +232,21 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet
return resources, nil
}

// fetchAllResourcesInOneNamespace retrieve all the objects inside a single namespace which includes the namespace itself.
func (r *Reconciler) fetchAllResourcesInOneNamespace(_ context.Context, namespaceName string, placeName string) ([]runtime.Object, error) {
// fetchAllResourcesInOneNamespace retrieves all the objects inside a single namespace which includes the namespace itself.
func (r *Reconciler) fetchAllResourcesInOneNamespace(namespaceName string, placeName string) ([]runtime.Object, error) {
var resources []runtime.Object

if !utils.ShouldPropagateNamespace(namespaceName, r.SkippedNamespaces) {
return nil, fmt.Errorf("namespace %s is not allowed to propagate", namespaceName)
err := fmt.Errorf("invalid clusterRresourcePlacement %s: namespace %s is not allowed to propagate", placeName, namespaceName)
return nil, controller.NewUserError(err)
}

klog.V(2).InfoS("start to fetch all the resources inside a namespace", "namespace", namespaceName)
// select the namespace object itself
obj, err := r.InformerManager.Lister(utils.NamespaceGVR).Get(namespaceName)
if err != nil {
klog.ErrorS(err, "cannot get the namespace", "namespace", namespaceName)
return nil, client.IgnoreNotFound(err)
return nil, controller.NewAPIServerError(true, client.IgnoreNotFound(err))
}
nameSpaceObj := obj.DeepCopyObject().(*unstructured.Unstructured)
if nameSpaceObj.GetDeletionTimestamp() != nil {
Expand All @@ -250,18 +263,19 @@ func (r *Reconciler) fetchAllResourcesInOneNamespace(_ context.Context, namespac
continue
}
if !r.InformerManager.IsInformerSynced(gvr) {
return nil, fmt.Errorf("informer cache for %+v is not synced yet", gvr)
return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", gvr))
}
lister := r.InformerManager.Lister(gvr)
objs, err := lister.ByNamespace(namespaceName).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("cannot list all the objects of type %+v in namespace %s: %w", gvr, namespaceName, err)
return nil, controller.NewAPIServerError(true, fmt.Errorf("cannot list all the objects of type %+v in namespace %s: %w", gvr, namespaceName, err))
}
for _, obj := range objs {
uObj := obj.DeepCopyObject().(*unstructured.Unstructured)
shouldInclude, err := utils.ShouldPropagateObj(r.InformerManager, uObj)
if err != nil {
return nil, fmt.Errorf("cannot determine if we should propagate an object: %w", err)
klog.ErrorS(err, "cannot determine if we should propagate an object", "object", klog.KObj(uObj))
return nil, err
}
if shouldInclude {
resources = append(resources, obj)
Expand Down Expand Up @@ -291,9 +305,8 @@ func (r *Reconciler) shouldSelectResource(gvr schema.GroupVersionResource) bool
return true
}

// generateManifest creates a manifest from the unstructured obj,
// it stripped all the unnecessary fields to prepare the objects for dispatch.
func generateManifest(object *unstructured.Unstructured) (*workv1alpha1.Manifest, error) {
// generateRawContent strips all the unnecessary fields to prepare the objects for dispatch.
func generateRawContent(object *unstructured.Unstructured) ([]byte, error) {
// we keep the annotation/label/finalizer/owner references/delete grace period
object.SetResourceVersion("")
object.SetGeneration(0)
Expand Down Expand Up @@ -346,7 +359,47 @@ func generateManifest(object *unstructured.Unstructured) (*workv1alpha1.Manifest
if err != nil {
return nil, fmt.Errorf("failed to marshal the unstructured object gvk = %s, name =%s: %w", object.GroupVersionKind(), object.GetName(), err)
}
return rawContent, nil
}

// generateManifest creates a manifest from the unstructured obj.
func generateManifest(object *unstructured.Unstructured) (*workv1alpha1.Manifest, error) {
rawContent, err := generateRawContent(object)
if err != nil {
return nil, err
}
return &workv1alpha1.Manifest{
RawExtension: runtime.RawExtension{Raw: rawContent},
}, nil
}

// generateResourceContent creates a resource content from the unstructured obj.
func generateResourceContent(object *unstructured.Unstructured) (*fleetv1beta1.ResourceContent, error) {
rawContent, err := generateRawContent(object)
if err != nil {
return nil, controller.NewUnexpectedBehaviorError(err)
}
return &fleetv1beta1.ResourceContent{
RawExtension: runtime.RawExtension{Raw: rawContent},
}, nil
}

// selectResourcesForPlacement selects the resources according to the placement resourceSelectors.
// It also generates an array of resource content based on the selected resources.
func (r *Reconciler) selectResourcesForPlacement(placement *fleetv1beta1.ClusterResourcePlacement) ([]fleetv1beta1.ResourceContent, error) {
selectedObjects, err := r.gatherSelectedResource(placement.GetName(), placement.Spec.ResourceSelectors)
if err != nil {
return nil, err
}

resources := make([]fleetv1beta1.ResourceContent, len(selectedObjects))
for i, obj := range selectedObjects {
unstructuredObj := obj.DeepCopyObject().(*unstructured.Unstructured)
rc, err := generateResourceContent(unstructuredObj)
if err != nil {
return nil, err
}
resources[i] = *rc
}
return resources, nil
}
Loading