Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics"
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
workcontrollers "sigs.k8s.io/work-api/pkg/controllers"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/controllers/internalmembercluster"
workapi "go.goms.io/fleet/pkg/controllers/work"
fleetmetrics "go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -200,7 +200,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
}

// create the work controller, so we can pass it to the internal member cluster reconciler
workController := workcontrollers.NewApplyWorkReconciler(
workController := workapi.NewApplyWorkReconciler(
hubMgr.GetClient(),
spokeDynamicClient,
memberMgr.GetClient(),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0
github.com/crossplane/crossplane-runtime v0.16.0
github.com/go-logr/logr v1.2.0
github.com/google/go-cmp v0.5.8
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
Expand Down Expand Up @@ -40,7 +41,6 @@ require (
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/go-logr/zapr v1.2.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.6 // indirect
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/clusterresourceplacement/work_propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
workController "sigs.k8s.io/work-api/pkg/controllers"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
workapi "go.goms.io/fleet/pkg/controllers/work"
"go.goms.io/fleet/pkg/utils"
)

Expand Down Expand Up @@ -174,7 +174,7 @@ func (r *Reconciler) collectAllManifestsStatus(placement *fleetv1alpha1.ClusterR
return false, errors.Wrap(err, fmt.Sprintf("failed to get the work obj %s from namespace %s", workName, memberClusterNsName))
}
// check the overall condition
appliedCond := meta.FindStatusCondition(work.Status.Conditions, workController.ConditionTypeApplied)
appliedCond := meta.FindStatusCondition(work.Status.Conditions, workapi.ConditionTypeApplied)
if appliedCond == nil {
hasPending = true
klog.V(4).InfoS("the work is never picked up by the member cluster",
Expand All @@ -201,7 +201,7 @@ func (r *Reconciler) collectAllManifestsStatus(placement *fleetv1alpha1.ClusterR
Name: manifestCondition.Identifier.Name,
Namespace: manifestCondition.Identifier.Namespace,
}
appliedCond = meta.FindStatusCondition(manifestCondition.Conditions, workController.ConditionTypeApplied)
appliedCond = meta.FindStatusCondition(manifestCondition.Conditions, workapi.ConditionTypeApplied)
// collect if there is an explicit fail
if appliedCond != nil && appliedCond.Status != metav1.ConditionTrue {
klog.V(3).InfoS("find a failed to apply manifest", "member cluster namespace", memberClusterNsName,
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/internalmembercluster/member_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
workcontrollers "sigs.k8s.io/work-api/pkg/controllers"

"go.goms.io/fleet/apis"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
workapi "go.goms.io/fleet/pkg/controllers/work"
"go.goms.io/fleet/pkg/metrics"
)

Expand All @@ -36,7 +36,7 @@ type Reconciler struct {
// the join/leave agent maintains the list of controllers in the member cluster
// so that it can make sure that all the agents on the member cluster have joined/left
// before updating the internal member cluster CR status
workController *workcontrollers.ApplyWorkReconciler
workController *workapi.ApplyWorkReconciler

recorder record.EventRecorder
}
Expand All @@ -51,7 +51,7 @@ const (
)

// NewReconciler creates a new reconciler for the internalMemberCluster CR
func NewReconciler(hubClient client.Client, memberClient client.Client, workController *workcontrollers.ApplyWorkReconciler) *Reconciler {
func NewReconciler(hubClient client.Client, memberClient client.Client, workController *workapi.ApplyWorkReconciler) *Reconciler {
return &Reconciler{
hubClient: hubClient,
memberClient: memberClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
workcontrollers "sigs.k8s.io/work-api/pkg/controllers"

"go.goms.io/fleet/apis/v1alpha1"
workapi "go.goms.io/fleet/pkg/controllers/work"
"go.goms.io/fleet/pkg/utils"
)

Expand Down Expand Up @@ -58,7 +58,7 @@ var _ = Describe("Test Internal Member Cluster Controller", func() {
}

By("create the internalMemberCluster reconciler")
workController := workcontrollers.NewApplyWorkReconciler(
workController := workapi.NewApplyWorkReconciler(
k8sClient, nil, k8sClient, nil, nil, 5, memberClusterNamespace)
r = NewReconciler(k8sClient, k8sClient, workController)
err := r.SetupWithManager(mgr)
Expand Down
165 changes: 165 additions & 0 deletions pkg/controllers/work/applied_work_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"

workapi "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
)

// generateDiff check the difference between what is supposed to be applied (tracked by the work CR status)
// and what was applied in the member cluster (tracked by the appliedWork CR).
// What is in the `appliedWork` but not in the `work` should be deleted from the member cluster
// What is in the `work` but not in the `appliedWork` should be added to the appliedWork status
func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Work, appliedWork *workapi.AppliedWork) ([]workapi.AppliedResourceMeta, []workapi.AppliedResourceMeta, error) {
var staleRes, newRes []workapi.AppliedResourceMeta
// for every resource applied in cluster, check if it's still in the work's manifest condition
// we keep the applied resource in the appliedWork status even if it is not applied successfully
// to make sure that it is safe to delete the resource from the member cluster.
for _, resourceMeta := range appliedWork.Status.AppliedResources {
resStillExist := false
for _, manifestCond := range work.Status.ManifestConditions {
if isSameResourceIdentifier(resourceMeta.ResourceIdentifier, manifestCond.Identifier) {
resStillExist = true
break
}
}
if !resStillExist {
klog.V(2).InfoS("find an orphaned resource in the member cluster",
"parent resource", work.GetName(), "orphaned resource", resourceMeta.ResourceIdentifier)
staleRes = append(staleRes, resourceMeta)
}
}
// add every resource in the work's manifest condition that is applied successfully back to the appliedWork status
for _, manifestCond := range work.Status.ManifestConditions {
ac := meta.FindStatusCondition(manifestCond.Conditions, ConditionTypeApplied)
if ac == nil {
// should not happen
klog.ErrorS(fmt.Errorf("resource is missing applied condition"), "applied condition missing", "resource", manifestCond.Identifier)
continue
}
// we only add the applied one to the appliedWork status
if ac.Status == metav1.ConditionTrue {
resRecorded := false
// we update the identifier
// TODO: this UID may not be the current one if the resource is deleted and recreated
for _, resourceMeta := range appliedWork.Status.AppliedResources {
if isSameResourceIdentifier(resourceMeta.ResourceIdentifier, manifestCond.Identifier) {
resRecorded = true
newRes = append(newRes, workapi.AppliedResourceMeta{
ResourceIdentifier: manifestCond.Identifier,
UID: resourceMeta.UID,
})
break
}
}
if !resRecorded {
klog.V(2).InfoS("discovered a new manifest resource",
"parent Work", work.GetName(), "manifest", manifestCond.Identifier)
obj, err := r.spokeDynamicClient.Resource(schema.GroupVersionResource{
Group: manifestCond.Identifier.Group,
Version: manifestCond.Identifier.Version,
Resource: manifestCond.Identifier.Resource,
}).Namespace(manifestCond.Identifier.Namespace).Get(ctx, manifestCond.Identifier.Name, metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
klog.V(2).InfoS("the new manifest resource is already deleted", "parent Work", work.GetName(), "manifest", manifestCond.Identifier)
continue
case err != nil:
klog.ErrorS(err, "failed to retrieve the manifest", "parent Work", work.GetName(), "manifest", manifestCond.Identifier)
return nil, nil, err
}
newRes = append(newRes, workapi.AppliedResourceMeta{
ResourceIdentifier: manifestCond.Identifier,
UID: obj.GetUID(),
})
}
}
}
return newRes, staleRes, nil
}

func (r *ApplyWorkReconciler) deleteStaleManifest(ctx context.Context, staleManifests []workapi.AppliedResourceMeta, owner metav1.OwnerReference) error {
var errs []error

for _, staleManifest := range staleManifests {
gvr := schema.GroupVersionResource{
Group: staleManifest.Group,
Version: staleManifest.Version,
Resource: staleManifest.Resource,
}
uObj, err := r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace).
Get(ctx, staleManifest.Name, metav1.GetOptions{})
if err != nil {
// It is possible that the staled manifest was already deleted but the status wasn't updated to reflect that yet.
if apierrors.IsNotFound(err) {
klog.V(2).InfoS("the staled manifest already deleted", "manifest", staleManifest, "owner", owner)
continue
}
klog.ErrorS(err, "failed to get the staled manifest", "manifest", staleManifest, "owner", owner)
errs = append(errs, err)
continue
}
existingOwners := uObj.GetOwnerReferences()
newOwners := make([]metav1.OwnerReference, 0)
found := false
for index, r := range existingOwners {
if isReferSameObject(r, owner) {
found = true
newOwners = append(newOwners, existingOwners[:index]...)
newOwners = append(newOwners, existingOwners[index+1:]...)
}
}
if !found {
klog.V(2).InfoS("the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner)
continue
}
if len(newOwners) == 0 {
klog.V(2).InfoS("delete the staled manifest", "manifest", staleManifest, "owner", owner)
err = r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace).
Delete(ctx, staleManifest.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
klog.ErrorS(err, "failed to delete the staled manifest", "manifest", staleManifest, "owner", owner)
errs = append(errs, err)
}
} else {
klog.V(2).InfoS("remove the owner reference from the staled manifest", "manifest", staleManifest, "owner", owner)
uObj.SetOwnerReferences(newOwners)
_, err = r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace).Update(ctx, uObj, metav1.UpdateOptions{FieldManager: workFieldManagerName})
if err != nil {
klog.ErrorS(err, "failed to remove the owner reference from manifest", "manifest", staleManifest, "owner", owner)
errs = append(errs, err)
}
}
}
return utilerrors.NewAggregate(errs)
}

// isSameResourceIdentifier returns true if a and b identifies the same object.
func isSameResourceIdentifier(a, b workapi.ResourceIdentifier) bool {
// compare GVKNN but ignore the Ordinal and Resource
return a.Group == b.Group && a.Version == b.Version && a.Kind == b.Kind && a.Namespace == b.Namespace && a.Name == b.Name
}
Loading