Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/hubagent/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewOptions() *Options {
LeaderElect: true,
ResourceLock: resourcelock.LeasesResourceLock,
ResourceNamespace: utils.FleetSystemNamespace,
ResourceName: "13622se4848560.hub.fleet.azure.com",
ResourceName: "136224848560.hub.fleet.azure.com",
},
ConcurrentClusterPlacementSyncs: 1,
ConcurrentResourceChangeSyncs: 1,
Expand Down
24 changes: 10 additions & 14 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package workload

import (
"context"

"strings"

"k8s.io/client-go/discovery"
Expand All @@ -28,9 +27,9 @@ import (
)

const (
clusterResourcePlacementName = "cluster-resource-placement-controller"
resourceChangeName = "resource-change-controller"
memberClusterPlacementName = "memberCluster-placement-controller"
crpControllerName = "cluster-resource-placement-controller"
resourceChangeControllerName = "resource-change-controller"
mcPlacementControllerName = "memberCluster-placement-controller"
)

// SetupControllers set up the customized controllers we developed
Expand Down Expand Up @@ -65,11 +64,8 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
return err
}

// setup namespaces we skip propagation
skippedNamespaces := make(map[string]bool)
skippedNamespaces["fleet-system"] = true
skippedNamespaces["kube-system"] = true
skippedNamespaces["kube-public"] = true
skippedNamespaces["kube-node-lease"] = true
skippedNamespaces["default"] = true
optionalSkipNS := strings.Split(opts.SkippedPropagatingNamespaces, ";")
for _, ns := range optionalSkipNS {
Expand All @@ -87,15 +83,15 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
klog.Info("Setting up clusterResourcePlacement controller")
crpc := &clusterresourceplacement.Reconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(clusterResourcePlacementName),
Recorder: mgr.GetEventRecorderFor(crpControllerName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
DisabledResourceConfig: disabledResourceConfig,
WorkPendingGracePeriod: opts.WorkPendingGracePeriod,
SkippedNamespaces: skippedNamespaces,
}

ratelimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts)
clusterResourcePlacementController := controller.NewController(clusterResourcePlacementName, controller.NamespaceKeyFunc, crpc.Reconcile, ratelimiter)
clusterResourcePlacementController := controller.NewController(crpControllerName, controller.NamespaceKeyFunc, crpc.Reconcile, ratelimiter)
if err != nil {
klog.ErrorS(err, "unable to set up clusterResourcePlacement controller")
return err
Expand All @@ -105,13 +101,13 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
klog.Info("Setting up resource change controller")
rcr := &resourcechange.Reconciler{
DynamicClient: dynamicClient,
Recorder: mgr.GetEventRecorderFor(resourceChangeName),
Recorder: mgr.GetEventRecorderFor(resourceChangeControllerName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
PlacementController: clusterResourcePlacementController,
}

resourceChangeController := controller.NewController(resourceChangeName, controller.ClusterWideKeyFunc, rcr.Reconcile, ratelimiter)
resourceChangeController := controller.NewController(resourceChangeControllerName, controller.ClusterWideKeyFunc, rcr.Reconcile, ratelimiter)
if err != nil {
klog.ErrorS(err, "unable to set up resource change controller")
return err
Expand All @@ -124,7 +120,7 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
PlacementController: clusterResourcePlacementController,
}

memberClusterPlacementController := controller.NewController(memberClusterPlacementName, controller.NamespaceKeyFunc, mcp.Reconcile, ratelimiter)
memberClusterPlacementController := controller.NewController(mcPlacementControllerName, controller.NamespaceKeyFunc, mcp.Reconcile, ratelimiter)
if err != nil {
klog.ErrorS(err, "unable to set up resource change controller")
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ require (

replace (
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.24.2 // weird bug that the goland won't compile without this
sigs.k8s.io/work-api => github.com/Azure/k8s-work-api v0.4.1
sigs.k8s.io/work-api => github.com/Azure/k8s-work-api v0.4.2
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSY
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/Azure/k8s-work-api v0.4.1 h1:q2KEi1yCCoS+1hm9rZ8IHaKCjiNhtd0/SXHfuNl6Ruk=
github.com/Azure/k8s-work-api v0.4.1/go.mod h1:VtsAdhZMoEP9WOEW+LmLm6NRHNyIjJ5xGOzJA64O7ew=
github.com/Azure/k8s-work-api v0.4.2 h1:Kwl8pmBfiykgWws12ud80TpU9gQNveyR7zlwMutGwGc=
github.com/Azure/k8s-work-api v0.4.2/go.mod h1:FOGJkJ+uxjWlvUgmqUlRcmr4Q2ijocrUO/aLJv827y8=
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down
24 changes: 9 additions & 15 deletions pkg/controllers/clusterresourceplacement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informer"
"go.goms.io/fleet/pkg/utils/validator"
)

const (
Expand All @@ -41,19 +40,22 @@ var (

// Reconciler reconciles a cluster resource placement object
type Reconciler struct {
// the informer contains the cache for all the resources we need
// the informer contains the cache for all the resources we need.
InformerManager informer.Manager

// RestMapper is used to convert between gvk and gvr on known resources.
RestMapper meta.RESTMapper

// Client is used to update objects which goes to the api server directly
// Client is used to update objects which goes to the api server directly.
Client client.Client

// DisabledResourceConfig contains all the api resources that we won't select
// DisabledResourceConfig contains all the api resources that we won't select.
DisabledResourceConfig *utils.DisabledResourceConfig

WorkPendingGracePeriod metav1.Duration
Recorder record.EventRecorder
// SkippedNamespaces contains the namespaces that we should not propagate.
SkippedNamespaces map[string]bool

Recorder record.EventRecorder
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
Expand All @@ -74,14 +76,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct

// TODO: add finalizer logic if we need it in the future

// TODO: move this to webhook
if err := validator.ValidateClusterResourcePlacement(placementOld); err != nil {
invalidSpec := "the spec is invalid"
klog.ErrorS(err, invalidSpec, "placement", placeRef)
r.Recorder.Event(placementOld, corev1.EventTypeWarning, invalidSpec, err.Error())
return ctrl.Result{}, nil
}

klog.V(2).InfoS("Start to reconcile a ClusterResourcePlacement", "placement", placeRef)
// select the new clusters and record that in the placementNew status
selectedClusters, scheduleErr := r.selectClusters(placementNew)
Expand All @@ -103,7 +97,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
// select the new resources and record the result in the placementNew status
manifests, scheduleErr := r.selectResources(ctx, placementNew)
if scheduleErr != nil {
klog.ErrorS(scheduleErr, "failed to generate the work resource for this placementOld", "placement", placeRef)
klog.ErrorS(scheduleErr, "failed to select the resources for this placement", "placement", placeRef)
r.updatePlacementScheduledCondition(placementOld, scheduleErr)
_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
return ctrl.Result{}, scheduleErr
Expand Down
24 changes: 16 additions & 8 deletions pkg/controllers/clusterresourceplacement/resource_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
func (r *Reconciler) selectResources(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement) ([]workv1alpha1.Manifest, error) {
selectedObjects, err := r.gatherSelectedResource(ctx, placement)
if err != nil {
return nil, errors.Wrap(err, "Failed to gather all the selected resource")
return nil, err
}
placement.Status.SelectedResources = make([]fleetv1alpha1.ResourceIdentifier, 0)
manifests := make([]workv1alpha1.Manifest, len(selectedObjects))
Expand Down Expand Up @@ -177,11 +177,15 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet

if len(selector.Name) != 0 {
// just a single namespace
return r.fetchAllResourcesInOneNamespace(ctx, selector.Name, placeName)
objs, err := r.fetchAllResourcesInOneNamespace(ctx, selector.Name, placeName)
if err != nil {
klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", selector.Name)
return nil, err
}
return objs, err
}
// go through each namespace
lister := r.InformerManager.Lister(utils.NamespaceGVR)

// go through each namespace
var labelSelector labels.Selector
var err error
if selector.LabelSelector == nil {
Expand All @@ -192,9 +196,9 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet
return nil, errors.Wrap(err, "cannot convert the label selector to a selector")
}
}
namespaces, err := lister.List(labelSelector)
namespaces, err := r.InformerManager.Lister(utils.NamespaceGVR).List(labelSelector)
if err != nil {
return nil, errors.Wrap(err, "cannot list all the namespaces")
return nil, errors.Wrap(err, "cannot list all the namespaces given the label selector")
}

for _, namespace := range namespaces {
Expand All @@ -204,19 +208,23 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet
}
objs, err := r.fetchAllResourcesInOneNamespace(ctx, ns.GetName(), placeName)
if err != nil {
klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", ns.GetName())
return nil, err
}
resources = append(resources, objs...)
}

return resources, nil
}

// fetchAllResourcesInOneNamespace retrieve all the objects inside a single namespace which includes the namespace itself.
func (r *Reconciler) fetchAllResourcesInOneNamespace(ctx context.Context, namespaceName string, placeName string) ([]runtime.Object, error) {
klog.V(4).InfoS("start to fetch all the resources inside a namespace", "namespace", namespaceName)
var resources []runtime.Object

if !utils.ShouldPropagateNamespace(namespaceName, r.SkippedNamespaces) {
return nil, errors.New(fmt.Sprintf("namespace %s is not allowed to propagate", namespaceName))
}

klog.V(4).InfoS("start to fetch all the resources inside a namespace", "namespace", namespaceName)
// select the namespace object itself
obj, err := r.InformerManager.Lister(utils.NamespaceGVR).Get(namespaceName)
if err != nil {
Expand Down
11 changes: 1 addition & 10 deletions pkg/resourcewatcher/change_dector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package resourcewatcher

import (
"context"
"strings"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -189,15 +188,7 @@ func (d *ChangeDetector) dynamicResourceFilter(obj interface{}) bool {
}

cwKey, _ := key.(keys.ClusterWideKey)
// special case for cluster namespace
if strings.HasPrefix(cwKey.Namespace, utils.ClusterNamespacePrefix) {
klog.V(5).InfoS("Skip watching resource in namespace", "namespace", cwKey.Namespace,
"group", cwKey.Group, "version", cwKey.Version, "kind", cwKey.Kind, "object", cwKey.Name)
return false
}

// if SkippedNamespaces is set, skip any events related to the object in these namespaces.
if _, ok := d.SkippedNamespaces[cwKey.Namespace]; ok {
if !utils.ShouldPropagateNamespace(cwKey.Namespace, d.SkippedNamespaces) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also check IsResourceDisabled in this function as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, shall we block more networking resources types in pkg/utils/apiresources.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsResourceDisabled is checked in the utils.ShouldPropagateResource

klog.V(5).InfoS("Skip watching resource in namespace", "namespace", cwKey.Namespace,
"group", cwKey.Group, "version", cwKey.Version, "kind", cwKey.Kind, "object", cwKey.Name)
return false
Expand Down
40 changes: 27 additions & 13 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"log"
"math/big"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -30,17 +31,21 @@ import (
)

const (
FleetSystemNamespace = "fleet-system"

ClusterNamespacePrefix = "fleet-member-"
NamespaceNameFormat = ClusterNamespacePrefix + "%s"

RoleNameFormat = "fleet-role-%s"

RoleBindingNameFormat = "fleet-rolebinding-%s"
kubePrefix = "kube-"
fleetPrefix = "fleet-"
FleetSystemNamespace = fleetPrefix + "system"
NamespaceNameFormat = fleetPrefix + "member-%s"
RoleNameFormat = fleetPrefix + "role-%s"
RoleBindingNameFormat = fleetPrefix + "rolebinding-%s"
)

PlacementFieldManagerName = "cluster-placement-controller"
const (
// NetworkingGroupName is the group name of the fleet networking.
NetworkingGroupName = "networking.fleet.azure.com"
)

const (
PlacementFieldManagerName = "cluster-placement-controller"
MCControllerFieldManagerName = "member-cluster-controller"
)

Expand All @@ -56,10 +61,6 @@ const (
// PlacementFinalizer is used to make sure that we handle gc of placement resources.
PlacementFinalizer = "work.fleet.azure.com/placement-protection"
)
const (
// NetworkingGroupName is the group name of the fleet networking.
NetworkingGroupName = "networking.fleet.azure.com"
)

var (
FleetRule = rbacv1.PolicyRule{
Expand Down Expand Up @@ -222,3 +223,16 @@ func ShouldPropagateObj(informerManager informer.Manager, uObj *unstructured.Uns
}
return true, nil
}

// ShouldPropagateNamespace decides if we should propagate the resources in the namespace
func ShouldPropagateNamespace(namespace string, skippedNamespaces map[string]bool) bool {
// special case for namespace have the reserved prefix
if strings.HasPrefix(namespace, fleetPrefix) || strings.HasPrefix(namespace, kubePrefix) {
return false
}

if skippedNamespaces[namespace] {
return false
}
return true
}
24 changes: 24 additions & 0 deletions test/e2e/utils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/format"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -246,3 +247,26 @@ func DeleteServiceAccount(cluster framework.Cluster, sa *corev1.ServiceAccount)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
}

// AlreadyExistMatcher matches the error to be already exist
type AlreadyExistMatcher struct {
}

// Match matches error.
func (matcher AlreadyExistMatcher) Match(actual interface{}) (success bool, err error) {
if actual == nil {
return false, nil
}
actualError := actual.(error)
return apierrors.IsAlreadyExists(actualError), nil
}

// FailureMessage builds an error message.
func (matcher AlreadyExistMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, "to be already exist")
}

// NegatedFailureMessage builds an error message.
func (matcher AlreadyExistMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, "not to be already exist")
}
2 changes: 1 addition & 1 deletion test/e2e/work_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
workapi "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
"sigs.k8s.io/work-api/pkg/utils"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
fleetutil "go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/test/e2e/utils"
)

const (
Expand Down
Loading