diff --git a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go b/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go index 804a91ac5..c94378333 100644 --- a/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go +++ b/pkg/controllers/clusterresourceplacement/clusterresourceplacement_controller.go @@ -48,7 +48,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false) if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil { klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterPolicySnapshot", klog.KObj(latestPolicySnapshot)) - return ctrl.Result{}, controller.NewAPIServerError(err) + return ctrl.Result{}, controller.NewAPIServerError(false, err) } } if latestPolicySnapshot != nil && string(latestPolicySnapshot.Spec.PolicyHash) == policyHash { @@ -89,11 +89,15 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster if err := r.Client.Create(ctx, latestPolicySnapshot); err != nil { klog.ErrorS(err, "Failed to create new clusterPolicySnapshot", "clusterPolicySnapshot", policySnapshotKObj) - return ctrl.Result{}, controller.NewAPIServerError(err) + return ctrl.Result{}, controller.NewAPIServerError(false, err) } } // create clusterResourceSnapshot + // TODO + if _, err := r.selectResourcesForPlacement(crp); err != nil { + return ctrl.Result{}, err + } // update the status based on the latestPolicySnapshot status // update the status based on the work return ctrl.Result{}, nil @@ -132,7 +136,7 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv } if err := r.Client.Update(ctx, latest); err != nil { klog.ErrorS(err, "Failed to update the clusterPolicySnapshot", "clusterPolicySnapshot", klog.KObj(latest)) - return controller.NewAPIServerError(err) + return controller.NewAPIServerError(false, err) } return nil } @@ -154,7 +158,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp crpKObj := klog.KObj(crp) if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil { klog.ErrorS(err, "Failed to list active clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, controller.NewAPIServerError(err) + return nil, -1, controller.NewAPIServerError(false, err) } if len(snapshotList.Items) == 1 { policyIndex, err := parsePolicyIndexFromLabel(&snapshotList.Items[0]) @@ -172,7 +176,7 @@ func (r *Reconciler) lookupLatestClusterPolicySnapshot(ctx context.Context, crp // When there are no active snapshots, find the one who has the largest policy index. if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil { klog.ErrorS(err, "Failed to list all clusterPolicySnapshots", "clusterResourcePlacement", crpKObj) - return nil, -1, controller.NewAPIServerError(err) + return nil, -1, controller.NewAPIServerError(false, err) } if len(snapshotList.Items) == 0 { // The policy index of the first snapshot will start from 0. diff --git a/pkg/controllers/clusterresourceplacement/placement_controller.go b/pkg/controllers/clusterresourceplacement/placement_controller.go index 3c0bbc1ac..52311466e 100644 --- a/pkg/controllers/clusterresourceplacement/placement_controller.go +++ b/pkg/controllers/clusterresourceplacement/placement_controller.go @@ -103,7 +103,7 @@ func (r *Reconciler) ReconcileV1Alpha1(ctx context.Context, key controller.Queue klog.V(2).InfoS("Successfully selected clusters", "placement", placementOld.Name, "number of clusters", len(selectedClusters)) // select the new resources and record the result in the placementNew status - manifests, scheduleErr := r.selectResources(ctx, placementNew) + manifests, scheduleErr := r.selectResources(placementNew) if scheduleErr != nil { klog.ErrorS(scheduleErr, "failed to select the resources for this placement", "placement", placeRef) r.updatePlacementScheduledCondition(placementOld, scheduleErr) diff --git a/pkg/controllers/clusterresourceplacement/resource_selector.go b/pkg/controllers/clusterresourceplacement/resource_selector.go index 4061e431f..f1ce5c9de 100644 --- a/pkg/controllers/clusterresourceplacement/resource_selector.go +++ b/pkg/controllers/clusterresourceplacement/resource_selector.go @@ -6,7 +6,6 @@ Licensed under the MIT license. package clusterresourceplacement import ( - "context" "fmt" "sort" "strings" @@ -22,14 +21,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/utils" + "go.goms.io/fleet/pkg/utils/controller" ) // selectResources selects the resources according to the placement resourceSelectors. // It also generates an array of manifests obj based on the selected resources. -func (r *Reconciler) selectResources(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement) ([]workv1alpha1.Manifest, error) { - selectedObjects, err := r.gatherSelectedResource(ctx, placement) +func (r *Reconciler) selectResources(placement *fleetv1alpha1.ClusterResourcePlacement) ([]workv1alpha1.Manifest, error) { + selectedObjects, err := r.gatherSelectedResource(placement.GetName(), convertResourceSelector(placement.Spec.ResourceSelectors)) if err != nil { return nil, err } @@ -56,10 +57,21 @@ func (r *Reconciler) selectResources(ctx context.Context, placement *fleetv1alph return manifests, nil } -// getSelectedResource get all the resources according to the resource selector. -func (r *Reconciler) gatherSelectedResource(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement) ([]runtime.Object, error) { +// Note: temporary solution to share the same set of utils between v1alpha1 and v1beta1 APIs so that v1alpha1 implementation +// won't be broken. v1alpha1 implementation should be removed when new API is ready. +// The clusterResourceSelect has no changes between different versions. +func convertResourceSelector(old []fleetv1alpha1.ClusterResourceSelector) []fleetv1beta1.ClusterResourceSelector { + res := make([]fleetv1beta1.ClusterResourceSelector, len(old)) + for i, item := range old { + res[i] = fleetv1beta1.ClusterResourceSelector(item) + } + return res +} + +// gatherSelectedResource gets all the resources according to the resource selector. +func (r *Reconciler) gatherSelectedResource(placement string, selectors []fleetv1beta1.ClusterResourceSelector) ([]runtime.Object, error) { var resources []runtime.Object - for _, selector := range placement.Spec.ResourceSelectors { + for _, selector := range selectors { gvk := schema.GroupVersionKind{ Group: selector.Group, Version: selector.Version, @@ -73,12 +85,12 @@ func (r *Reconciler) gatherSelectedResource(ctx context.Context, placement *flee var objs []runtime.Object var err error if gvk == utils.NamespaceGVK { - objs, err = r.fetchNamespaceResources(ctx, selector, placement.GetName()) + objs, err = r.fetchNamespaceResources(selector, placement) } else { - objs, err = r.fetchClusterScopedResources(ctx, selector, placement.GetName()) + objs, err = r.fetchClusterScopedResources(selector, placement) } if err != nil { - return nil, fmt.Errorf("selector = %v: %w", selector, err) + return nil, err } resources = append(resources, objs...) } @@ -102,8 +114,8 @@ func (r *Reconciler) gatherSelectedResource(ctx context.Context, placement *flee return resources, nil } -// fetchClusterScopedResources retrieve the objects based on the selector. -func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fleetv1alpha1.ClusterResourceSelector, placeName string) ([]runtime.Object, error) { +// fetchClusterScopedResources retrieves the objects based on the selector. +func (r *Reconciler) fetchClusterScopedResources(selector fleetv1beta1.ClusterResourceSelector, placeName string) ([]runtime.Object, error) { klog.V(2).InfoS("start to fetch the cluster scoped resources by the selector", "selector", selector) gk := schema.GroupKind{ Group: selector.Group, @@ -111,7 +123,7 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle } restMapping, err := r.RestMapper.RESTMapping(gk, selector.Version) if err != nil { - return nil, fmt.Errorf("failed to get GVR of the selector: %w", err) + return nil, controller.NewUserError(fmt.Errorf("failed to get GVR of the selector: %w", err)) } gvr := restMapping.Resource gvk := schema.GroupVersionKind{ @@ -120,10 +132,10 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle Kind: selector.Kind, } if !r.InformerManager.IsClusterScopedResources(gvk) { - return nil, fmt.Errorf("%+v is not a cluster scoped resource", restMapping.Resource) + return nil, controller.NewUserError(fmt.Errorf("%+v is not a cluster scoped resource", restMapping.Resource)) } if !r.InformerManager.IsInformerSynced(gvr) { - return nil, fmt.Errorf("informer cache for %+v is not synced yet", restMapping.Resource) + return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", restMapping.Resource)) } lister := r.InformerManager.Lister(gvr) @@ -132,7 +144,7 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle obj, err := lister.Get(selector.Name) if err != nil { klog.ErrorS(err, "cannot get the resource", "gvr", gvr, "name", selector.Name) - return nil, client.IgnoreNotFound(err) + return nil, controller.NewAPIServerError(true, client.IgnoreNotFound(err)) } uObj := obj.DeepCopyObject().(*unstructured.Unstructured) if uObj.GetDeletionTimestamp() != nil { @@ -151,13 +163,13 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle // TODO: validator should enforce the validity of the labelSelector labelSelector, err = metav1.LabelSelectorAsSelector(selector.LabelSelector) if err != nil { - return nil, fmt.Errorf("cannot convert the label selector to a selector: %w", err) + return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot convert the label selector to a selector: %w", err)) } } var selectedObjs []runtime.Object objects, err := lister.List(labelSelector) if err != nil { - return nil, fmt.Errorf("cannot list all the objets: %w", err) + return nil, controller.NewAPIServerError(true, fmt.Errorf("cannot list all the objets: %w", err)) } // go ahead and claim all objects by adding a finalizer and insert the placement in its annotation for i := 0; i < len(objects); i++ { @@ -174,14 +186,14 @@ func (r *Reconciler) fetchClusterScopedResources(_ context.Context, selector fle return selectedObjs, nil } -// fetchNamespaceResources retrieve all the objects for a ClusterResourceSelector that is for namespace. -func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleetv1alpha1.ClusterResourceSelector, placeName string) ([]runtime.Object, error) { +// fetchNamespaceResources retrieves all the objects for a ClusterResourceSelector that is for namespace. +func (r *Reconciler) fetchNamespaceResources(selector fleetv1beta1.ClusterResourceSelector, placeName string) ([]runtime.Object, error) { klog.V(2).InfoS("start to fetch the namespace resources by the selector", "selector", selector) var resources []runtime.Object if len(selector.Name) != 0 { // just a single namespace - objs, err := r.fetchAllResourcesInOneNamespace(ctx, selector.Name, placeName) + objs, err := r.fetchAllResourcesInOneNamespace(selector.Name, placeName) if err != nil { klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", selector.Name) return nil, err @@ -197,20 +209,20 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet } else { labelSelector, err = metav1.LabelSelectorAsSelector(selector.LabelSelector) if err != nil { - return nil, fmt.Errorf("cannot convert the label selector to a selector: %w", err) + return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot convert the label selector to a selector: %w", err)) } } namespaces, err := r.InformerManager.Lister(utils.NamespaceGVR).List(labelSelector) if err != nil { - return nil, fmt.Errorf("cannot list all the namespaces given the label selector: %w", err) + return nil, controller.NewAPIServerError(true, fmt.Errorf("cannot list all the namespaces given the label selector: %w", err)) } for _, namespace := range namespaces { ns, err := meta.Accessor(namespace) if err != nil { - return nil, fmt.Errorf("cannot get the name of a namespace object: %w", err) + return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot get the name of a namespace object: %w", err)) } - objs, err := r.fetchAllResourcesInOneNamespace(ctx, ns.GetName(), placeName) + objs, err := r.fetchAllResourcesInOneNamespace(ns.GetName(), placeName) if err != nil { klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", ns.GetName()) return nil, err @@ -220,12 +232,13 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet return resources, nil } -// fetchAllResourcesInOneNamespace retrieve all the objects inside a single namespace which includes the namespace itself. -func (r *Reconciler) fetchAllResourcesInOneNamespace(_ context.Context, namespaceName string, placeName string) ([]runtime.Object, error) { +// fetchAllResourcesInOneNamespace retrieves all the objects inside a single namespace which includes the namespace itself. +func (r *Reconciler) fetchAllResourcesInOneNamespace(namespaceName string, placeName string) ([]runtime.Object, error) { var resources []runtime.Object if !utils.ShouldPropagateNamespace(namespaceName, r.SkippedNamespaces) { - return nil, fmt.Errorf("namespace %s is not allowed to propagate", namespaceName) + err := fmt.Errorf("invalid clusterRresourcePlacement %s: namespace %s is not allowed to propagate", placeName, namespaceName) + return nil, controller.NewUserError(err) } klog.V(2).InfoS("start to fetch all the resources inside a namespace", "namespace", namespaceName) @@ -233,7 +246,7 @@ func (r *Reconciler) fetchAllResourcesInOneNamespace(_ context.Context, namespac obj, err := r.InformerManager.Lister(utils.NamespaceGVR).Get(namespaceName) if err != nil { klog.ErrorS(err, "cannot get the namespace", "namespace", namespaceName) - return nil, client.IgnoreNotFound(err) + return nil, controller.NewAPIServerError(true, client.IgnoreNotFound(err)) } nameSpaceObj := obj.DeepCopyObject().(*unstructured.Unstructured) if nameSpaceObj.GetDeletionTimestamp() != nil { @@ -250,18 +263,19 @@ func (r *Reconciler) fetchAllResourcesInOneNamespace(_ context.Context, namespac continue } if !r.InformerManager.IsInformerSynced(gvr) { - return nil, fmt.Errorf("informer cache for %+v is not synced yet", gvr) + return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", gvr)) } lister := r.InformerManager.Lister(gvr) objs, err := lister.ByNamespace(namespaceName).List(labels.Everything()) if err != nil { - return nil, fmt.Errorf("cannot list all the objects of type %+v in namespace %s: %w", gvr, namespaceName, err) + return nil, controller.NewAPIServerError(true, fmt.Errorf("cannot list all the objects of type %+v in namespace %s: %w", gvr, namespaceName, err)) } for _, obj := range objs { uObj := obj.DeepCopyObject().(*unstructured.Unstructured) shouldInclude, err := utils.ShouldPropagateObj(r.InformerManager, uObj) if err != nil { - return nil, fmt.Errorf("cannot determine if we should propagate an object: %w", err) + klog.ErrorS(err, "cannot determine if we should propagate an object", "object", klog.KObj(uObj)) + return nil, err } if shouldInclude { resources = append(resources, obj) @@ -291,9 +305,8 @@ func (r *Reconciler) shouldSelectResource(gvr schema.GroupVersionResource) bool return true } -// generateManifest creates a manifest from the unstructured obj, -// it stripped all the unnecessary fields to prepare the objects for dispatch. -func generateManifest(object *unstructured.Unstructured) (*workv1alpha1.Manifest, error) { +// generateRawContent strips all the unnecessary fields to prepare the objects for dispatch. +func generateRawContent(object *unstructured.Unstructured) ([]byte, error) { // we keep the annotation/label/finalizer/owner references/delete grace period object.SetResourceVersion("") object.SetGeneration(0) @@ -346,7 +359,47 @@ func generateManifest(object *unstructured.Unstructured) (*workv1alpha1.Manifest if err != nil { return nil, fmt.Errorf("failed to marshal the unstructured object gvk = %s, name =%s: %w", object.GroupVersionKind(), object.GetName(), err) } + return rawContent, nil +} + +// generateManifest creates a manifest from the unstructured obj. +func generateManifest(object *unstructured.Unstructured) (*workv1alpha1.Manifest, error) { + rawContent, err := generateRawContent(object) + if err != nil { + return nil, err + } return &workv1alpha1.Manifest{ RawExtension: runtime.RawExtension{Raw: rawContent}, }, nil } + +// generateResourceContent creates a resource content from the unstructured obj. +func generateResourceContent(object *unstructured.Unstructured) (*fleetv1beta1.ResourceContent, error) { + rawContent, err := generateRawContent(object) + if err != nil { + return nil, controller.NewUnexpectedBehaviorError(err) + } + return &fleetv1beta1.ResourceContent{ + RawExtension: runtime.RawExtension{Raw: rawContent}, + }, nil +} + +// selectResourcesForPlacement selects the resources according to the placement resourceSelectors. +// It also generates an array of resource content based on the selected resources. +func (r *Reconciler) selectResourcesForPlacement(placement *fleetv1beta1.ClusterResourcePlacement) ([]fleetv1beta1.ResourceContent, error) { + selectedObjects, err := r.gatherSelectedResource(placement.GetName(), placement.Spec.ResourceSelectors) + if err != nil { + return nil, err + } + + resources := make([]fleetv1beta1.ResourceContent, len(selectedObjects)) + for i, obj := range selectedObjects { + unstructuredObj := obj.DeepCopyObject().(*unstructured.Unstructured) + rc, err := generateResourceContent(unstructuredObj) + if err != nil { + return nil, err + } + resources[i] = *rc + } + return resources, nil +} diff --git a/pkg/controllers/clusterresourceplacement/resource_selector_test.go b/pkg/controllers/clusterresourceplacement/resource_selector_test.go index 94990acc0..7f2d26b1b 100644 --- a/pkg/controllers/clusterresourceplacement/resource_selector_test.go +++ b/pkg/controllers/clusterresourceplacement/resource_selector_test.go @@ -9,9 +9,10 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -19,6 +20,8 @@ import ( utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/utils/pointer" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" ) func TestGenerateManifest(t *testing.T) { @@ -29,7 +32,7 @@ func TestGenerateManifest(t *testing.T) { }{ "should generate sanitized manifest for Kind: CustomResourceDefinition": { unstructuredObj: func() *unstructured.Unstructured { - crd := v1.CustomResourceDefinition{ + crd := apiextensionsv1.CustomResourceDefinition{ TypeMeta: metav1.TypeMeta{ Kind: "CustomResourceDefinition", APIVersion: "apiextensions.k8s.io/v1", @@ -79,7 +82,7 @@ func TestGenerateManifest(t *testing.T) { return &unstructured.Unstructured{Object: mCrd} }, expectedManifest: func() *workv1alpha1.Manifest { - crd := v1.CustomResourceDefinition{ + crd := apiextensionsv1.CustomResourceDefinition{ TypeMeta: metav1.TypeMeta{ Kind: "CustomResourceDefinition", APIVersion: "apiextensions.k8s.io/v1", @@ -300,3 +303,232 @@ func makeIPFamilyPolicyTypePointer(policyType corev1.IPFamilyPolicyType) *corev1 func makeServiceInternalTrafficPolicyPointer(policyType corev1.ServiceInternalTrafficPolicyType) *corev1.ServiceInternalTrafficPolicyType { return &policyType } + +func TestGenerateResourceContent(t *testing.T) { + tests := map[string]struct { + resource interface{} + wantResource interface{} + }{ + "should generate sanitized resource content for Kind: CustomResourceDefinition": { + resource: apiextensionsv1.CustomResourceDefinition{ + TypeMeta: metav1.TypeMeta{ + Kind: "CustomResourceDefinition", + APIVersion: "apiextensions.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "object-name", + GenerateName: "object-generateName", + Namespace: "object-namespace", + SelfLink: "object-selflink", + UID: types.UID(utilrand.String(10)), + ResourceVersion: utilrand.String(10), + Generation: int64(utilrand.Int()), + CreationTimestamp: metav1.Time{Time: time.Date(utilrand.IntnRange(0, 999), time.January, 1, 1, 1, 1, 1, time.UTC)}, + DeletionTimestamp: &metav1.Time{Time: time.Date(utilrand.IntnRange(1000, 1999), time.January, 1, 1, 1, 1, 1, time.UTC)}, + DeletionGracePeriodSeconds: pointer.Int64(9999), + Labels: map[string]string{ + "label-key": "label-value", + }, + Annotations: map[string]string{ + corev1.LastAppliedConfigAnnotation: "svc-object-annotation-lac-value", + "svc-annotation-key": "svc-object-annotation-key-value", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "svc-ownerRef-api/v1", + Kind: "svc-owner-kind", + Name: "svc-owner-name", + UID: "svc-owner-uid", + }, + }, + Finalizers: []string{"object-finalizer"}, + ManagedFields: []metav1.ManagedFieldsEntry{ + { + Manager: utilrand.String(10), + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: utilrand.String(10), + }, + }, + }, + }, + wantResource: apiextensionsv1.CustomResourceDefinition{ + TypeMeta: metav1.TypeMeta{ + Kind: "CustomResourceDefinition", + APIVersion: "apiextensions.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "object-name", + GenerateName: "object-generateName", + Namespace: "object-namespace", + DeletionGracePeriodSeconds: pointer.Int64(9999), + Labels: map[string]string{ + "label-key": "label-value", + }, + Annotations: map[string]string{ + "svc-annotation-key": "svc-object-annotation-key-value", + }, + Finalizers: []string{"object-finalizer"}, + }, + }, + }, + "should generate sanitized resource content for Kind: Service": { + resource: corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-name", + Namespace: "svc-namespace", + SelfLink: utilrand.String(10), + DeletionTimestamp: &metav1.Time{Time: time.Date(00002, time.January, 1, 1, 1, 1, 1, time.UTC)}, + ManagedFields: []metav1.ManagedFieldsEntry{ + { + Manager: "svc-manager", + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: "svc-manager-api/v1", + }, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "svc-ownerRef-api/v1", + Kind: "svc-owner-kind", + Name: "svc-owner-name", + UID: "svc-owner-uid", + }, + }, + Annotations: map[string]string{ + corev1.LastAppliedConfigAnnotation: "svc-object-annotation-lac-value", + "svc-annotation-key": "svc-object-annotation-key-value", + }, + ResourceVersion: "svc-object-resourceVersion", + Generation: int64(utilrand.Int()), + CreationTimestamp: metav1.Time{Time: time.Date(00001, time.January, 1, 1, 1, 1, 1, time.UTC)}, + UID: types.UID(utilrand.String(10)), + }, + Spec: corev1.ServiceSpec{ + ClusterIP: utilrand.String(10), + ClusterIPs: []string{}, + HealthCheckNodePort: int32(utilrand.Int()), + Selector: map[string]string{"svc-spec-selector-key": "svc-spec-selector-value"}, + Ports: []corev1.ServicePort{ + { + Name: "svc-port", + Protocol: corev1.ProtocolTCP, + AppProtocol: pointer.String("svc.com/my-custom-protocol"), + Port: 9001, + NodePort: int32(utilrand.Int()), + }, + }, + Type: corev1.ServiceType("svc-spec-type"), + ExternalIPs: []string{"svc-spec-externalIps-1"}, + SessionAffinity: corev1.ServiceAffinity("svc-spec-sessionAffinity"), + LoadBalancerIP: "192.168.1.3", + LoadBalancerSourceRanges: []string{"192.168.1.1"}, + ExternalName: "svc-spec-externalName", + ExternalTrafficPolicy: corev1.ServiceExternalTrafficPolicyType("svc-spec-externalTrafficPolicy"), + PublishNotReadyAddresses: false, + SessionAffinityConfig: &corev1.SessionAffinityConfig{ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: pointer.Int32(60)}}, + IPFamilies: []corev1.IPFamily{ + corev1.IPv4Protocol, + corev1.IPv6Protocol, + }, + IPFamilyPolicy: makeIPFamilyPolicyTypePointer(corev1.IPFamilyPolicySingleStack), + AllocateLoadBalancerNodePorts: pointer.Bool(false), + LoadBalancerClass: pointer.String("svc-spec-loadBalancerClass"), + InternalTrafficPolicy: makeServiceInternalTrafficPolicyPointer(corev1.ServiceInternalTrafficPolicyCluster), + }, + Status: corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{ + { + IP: "192.168.1.1", + Hostname: "loadbalancer-ingress-hostname", + Ports: []corev1.PortStatus{ + { + Port: 9003, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + }, + }, + }, + wantResource: corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-name", + Namespace: "svc-namespace", + Annotations: map[string]string{ + "svc-annotation-key": "svc-object-annotation-key-value", + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"svc-spec-selector-key": "svc-spec-selector-value"}, + Ports: []corev1.ServicePort{ + { + Name: "svc-port", + Protocol: corev1.ProtocolTCP, + AppProtocol: pointer.String("svc.com/my-custom-protocol"), + Port: 9001, + }, + }, + Type: corev1.ServiceType("svc-spec-type"), + ExternalIPs: []string{"svc-spec-externalIps-1"}, + SessionAffinity: corev1.ServiceAffinity("svc-spec-sessionAffinity"), + LoadBalancerIP: "192.168.1.3", + LoadBalancerSourceRanges: []string{"192.168.1.1"}, + ExternalName: "svc-spec-externalName", + ExternalTrafficPolicy: corev1.ServiceExternalTrafficPolicyType("svc-spec-externalTrafficPolicy"), + PublishNotReadyAddresses: false, + SessionAffinityConfig: &corev1.SessionAffinityConfig{ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: pointer.Int32(60)}}, + IPFamilies: []corev1.IPFamily{ + corev1.IPv4Protocol, + corev1.IPv6Protocol, + }, + IPFamilyPolicy: makeIPFamilyPolicyTypePointer(corev1.IPFamilyPolicySingleStack), + AllocateLoadBalancerNodePorts: pointer.Bool(false), + LoadBalancerClass: pointer.String("svc-spec-loadBalancerClass"), + InternalTrafficPolicy: makeServiceInternalTrafficPolicyPointer(corev1.ServiceInternalTrafficPolicyCluster), + }, + }, + }, + } + + for testName, tt := range tests { + t.Run(testName, func(t *testing.T) { + object, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&tt.resource) + if err != nil { + t.Fatalf("ToUnstructured failed: %v", err) + } + got, err := generateResourceContent(&unstructured.Unstructured{Object: object}) + if err != nil { + t.Fatalf("failed to generateResourceContent(): %v", err) + } + want, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&tt.wantResource) + if err != nil { + t.Fatalf("ToUnstructured failed: %v", err) + } + delete(want["metadata"].(map[string]interface{}), "creationTimestamp") + delete(want, "status") + + uWant := unstructured.Unstructured{Object: want} + rawWant, err := uWant.MarshalJSON() + if err != nil { + t.Fatalf("MarshalJSON failed: %v", err) + } + wantResourceContent := &fleetv1beta1.ResourceContent{ + RawExtension: runtime.RawExtension{ + Raw: rawWant, + }, + } + if diff := cmp.Diff(wantResourceContent, got); diff != "" { + t.Errorf("generateResourceContent() mismatch (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/utils/common.go b/pkg/utils/common.go index 2a8d915c5..87d9d62d0 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -25,6 +25,7 @@ import ( workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" + "go.goms.io/fleet/pkg/utils/controller" "go.goms.io/fleet/pkg/utils/informer" ) @@ -211,24 +212,21 @@ func ShouldPropagateObj(informerManager informer.Manager, uObj *unstructured.Uns // The secret, with type 'kubernetes.io/service-account-token', is created along with `ServiceAccount` should be // prevented from propagating. var secret corev1.Secret - err := runtime.DefaultUnstructuredConverter.FromUnstructured(uObj.Object, &secret) - if err != nil { - return false, fmt.Errorf( - "failed to convert a secret object %s in namespace %s: %w", uObj.GetName(), uObj.GetNamespace(), err) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(uObj.Object, &secret); err != nil { + return false, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert a secret object %s in namespace %s: %w", uObj.GetName(), uObj.GetNamespace(), err)) } if secret.Type == corev1.SecretTypeServiceAccountToken { return false, nil } case corev1.SchemeGroupVersion.WithKind("Endpoints"): // we assume that all endpoints with the same name of a service is created by the service controller - _, err := informerManager.Lister(ServiceGVR).ByNamespace(uObj.GetNamespace()).Get(uObj.GetName()) - if err != nil { + if _, err := informerManager.Lister(ServiceGVR).ByNamespace(uObj.GetNamespace()).Get(uObj.GetName()); err != nil { if apierrors.IsNotFound(err) { // there is no service of the same name as the end point, // we assume that this endpoint is created by the user return true, nil } - return false, fmt.Errorf("failed to get the service %s in namespace %s: %w", uObj.GetName(), uObj.GetNamespace(), err) + return false, controller.NewAPIServerError(true, fmt.Errorf("failed to get the service %s in namespace %s: %w", uObj.GetName(), uObj.GetNamespace(), err)) } // we find a service of the same name as the endpoint, we assume it's created by the service return false, nil diff --git a/pkg/utils/controller/controller.go b/pkg/utils/controller/controller.go index 7338e317f..ed5fb507b 100644 --- a/pkg/utils/controller/controller.go +++ b/pkg/utils/controller/controller.go @@ -12,6 +12,7 @@ import ( "sync" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -33,25 +34,56 @@ var ( // There should be something wrong with the system and cannot be recovered by itself. ErrUnexpectedBehavior = errors.New("unexpected behavior which cannot be handled by the controller") + // ErrExpectedBehavior indicates the current situation is expected, which can be recovered by itself after retries. + ErrExpectedBehavior = errors.New("expected behavior which can be recovered by itself") + // ErrAPIServerError indicates the error is returned by the API server. ErrAPIServerError = errors.New("error returned by the API server") + + // ErrUserError indicates the error is caused by the user and customer needs to take the action. + ErrUserError = errors.New("failed to process the request due to a client error") ) -// NewUnexpectedBehaviorError returns ErrUnexpectedBehavior type error. +// NewUnexpectedBehaviorError returns ErrUnexpectedBehavior type error when err is not nil. func NewUnexpectedBehaviorError(err error) error { // TODO(zhiying) emit error metrics or well defined logs if err != nil { return fmt.Errorf("%w: %v", ErrUnexpectedBehavior, err.Error()) } - return ErrUnexpectedBehavior + return nil +} + +// NewExpectedBehaviorError returns ErrExpectedBehavior type error when err is not nil. +func NewExpectedBehaviorError(err error) error { + if err != nil { + return fmt.Errorf("%w: %v", ErrExpectedBehavior, err.Error()) + } + return nil } -// NewAPIServerError returns ErrAPIServerError type error. -func NewAPIServerError(err error) error { +// NewAPIServerError returns error types when accessing data from cache or API server. +func NewAPIServerError(fromCache bool, err error) error { if err != nil { + if fromCache && isUnexpectedCacheError(err) { + return NewUnexpectedBehaviorError(err) + } return fmt.Errorf("%w: %v", ErrAPIServerError, err.Error()) } - return ErrAPIServerError + return nil +} + +func isUnexpectedCacheError(err error) bool { + // may need to add more error code based on the production + // Cache will return notFound for GET. + return !apierrors.IsNotFound(err) +} + +// NewUserError returns ErrUserError type error when err is not nil. +func NewUserError(err error) error { + if err != nil { + return fmt.Errorf("%w: %v", ErrUserError, err.Error()) + } + return nil } // Controller maintains a rate limiting queue and the items in the queue will be reconciled by a "ReconcileFunc". diff --git a/pkg/utils/controller/controller_test.go b/pkg/utils/controller/controller_test.go new file mode 100644 index 000000000..914634ae4 --- /dev/null +++ b/pkg/utils/controller/controller_test.go @@ -0,0 +1,145 @@ +package controller + +import ( + "errors" + "testing" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestNewUnexpectedBehaviorError(t *testing.T) { + tests := []struct { + name string + err error + wantErr error + }{ + { + name: "nil error", + err: nil, + }, + { + name: "unexpectedBehaviorError", + err: errors.New("unexpected"), + wantErr: ErrUnexpectedBehavior, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := NewUnexpectedBehaviorError(tc.err) + if tc.err == nil && got != nil { + t.Fatalf("NewUnexpectedBehaviorError(nil) = %v, want nil", got) + } + if tc.err != nil && !errors.Is(got, tc.wantErr) { + t.Fatalf("NewUnexpectedBehaviorError() = %v, want %v", got, tc.wantErr) + } + }) + } +} + +func TestNewExpectedBehaviorError(t *testing.T) { + tests := []struct { + name string + err error + wantErr error + }{ + { + name: "nil error", + err: nil, + }, + { + name: "expectedBehaviorError", + err: errors.New("expected"), + wantErr: ErrExpectedBehavior, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := NewExpectedBehaviorError(tc.err) + if tc.err == nil && got != nil { + t.Fatalf("NewExpectedBehaviorError(nil) = %v, want nil", got) + } + if tc.err != nil && !errors.Is(got, tc.wantErr) { + t.Fatalf("NewExpectedBehaviorError() = %v, want %v", got, tc.wantErr) + } + }) + } +} + +func TestNewAPIServerError(t *testing.T) { + tests := []struct { + name string + fromCache bool + err error + wantErr error + }{ + { + name: "nil error", + err: nil, + }, + { + name: "reading from cache: apiServerError", + fromCache: true, + err: apierrors.NewNotFound(schema.GroupResource{}, "invalid"), + wantErr: ErrAPIServerError, + }, + { + name: "reading from cache: unexpectedBehaviorError", + fromCache: true, + err: apierrors.NewConflict(schema.GroupResource{}, "conflict", nil), + wantErr: ErrUnexpectedBehavior, + }, + { + name: "reading from API server: apiServerError", + fromCache: false, + err: apierrors.NewNotFound(schema.GroupResource{}, "invalid"), + wantErr: ErrAPIServerError, + }, + { + name: "reading from API server: apiServerError", + fromCache: false, + err: apierrors.NewConflict(schema.GroupResource{}, "conflict", nil), + wantErr: ErrAPIServerError, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := NewAPIServerError(tc.fromCache, tc.err) + if tc.err == nil && got != nil { + t.Fatalf("NewAPIServerError(nil) = %v, want nil", got) + } + if tc.err != nil && !errors.Is(got, tc.wantErr) { + t.Fatalf("NewAPIServerError() = %v, want %v", got, tc.wantErr) + } + }) + } +} + +func TestNewUserError(t *testing.T) { + tests := []struct { + name string + err error + wantErr error + }{ + { + name: "nil error", + err: nil, + }, + { + name: "userError", + err: errors.New("user error"), + wantErr: ErrUserError, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := NewUserError(tc.err) + if tc.err == nil && got != nil { + t.Fatalf("NewUserError(nil) = %v, want nil", got) + } + if tc.err != nil && !errors.Is(got, tc.wantErr) { + t.Fatalf("NewUserError() = %v, want %v", got, tc.wantErr) + } + }) + } +}