Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controllers/clusterresourceplacement/work_propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (r *Reconciler) scheduleWork(ctx context.Context, placement *fleetv1alpha1.
Kind: placement.GroupVersionKind().Kind,
Name: placement.GetName(),
UID: placement.GetUID(),
BlockOwnerDeletion: pointer.BoolPtr(true),
Controller: pointer.BoolPtr(true),
BlockOwnerDeletion: pointer.Bool(true),
Controller: pointer.Bool(true),
}
workerSpec := workv1alpha1.WorkSpec{
Workload: workv1alpha1.WorkloadTemplate{
Expand Down
77 changes: 40 additions & 37 deletions pkg/controllers/work/apply_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,26 @@ func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.
}
}

// applyAction represents the action we take to apply the manifest
// +enum
type applyAction string

const (
// ManifestCreatedAction indicates that we created the manifest for the first time.
ManifestCreatedAction applyAction = "ManifestCreated"

// ManifestUpdatedAction indicates that we updated the manifest.
ManifestUpdatedAction applyAction = "ManifestUpdated"

// ManifestNoChangeAction indicates that we don't need to change the manifest.
ManifestNoChangeAction applyAction = "ManifestNoChange"
)

// applyResult contains the result of a manifest being applied.
type applyResult struct {
identifier workv1alpha1.ResourceIdentifier
generation int64
updated bool
action applyAction
err error
}

Expand Down Expand Up @@ -266,19 +281,16 @@ func (r *ApplyWorkReconciler) applyManifests(ctx context.Context, manifests []wo

default:
addOwnerRef(owner, rawObj)
appliedObj, result.updated, result.err = r.applyUnstructured(ctx, gvr, rawObj)
appliedObj, result.action, result.err = r.applyUnstructured(ctx, gvr, rawObj)
result.identifier = buildResourceIdentifier(index, rawObj, gvr)
logObjRef := klog.ObjectRef{
Name: result.identifier.Name,
Namespace: result.identifier.Namespace,
}
if result.err == nil {
result.generation = appliedObj.GetGeneration()
if result.updated {
klog.V(2).InfoS("manifest upsert succeeded", "gvr", gvr, "manifest", logObjRef, "new ObservedGeneration", result.generation)
} else {
klog.V(2).InfoS("manifest upsert unwarranted", "gvr", gvr, "manifest", logObjRef)
}
klog.V(2).InfoS("apply manifest succeeded", "gvr", gvr, "manifest", logObjRef,
"apply action", result.action, "new ObservedGeneration", result.generation)
} else {
klog.ErrorS(result.err, "manifest upsert failed", "gvr", gvr, "manifest", logObjRef)
}
Expand All @@ -305,29 +317,30 @@ func (r *ApplyWorkReconciler) decodeManifest(manifest workv1alpha1.Manifest) (sc
}

// Determines if an unstructured manifest object can & should be applied. If so, it applies (creates) the resource on the cluster.
func (r *ApplyWorkReconciler) applyUnstructured(ctx context.Context, gvr schema.GroupVersionResource, manifestObj *unstructured.Unstructured) (*unstructured.Unstructured, bool, error) {
func (r *ApplyWorkReconciler) applyUnstructured(ctx context.Context, gvr schema.GroupVersionResource,
manifestObj *unstructured.Unstructured) (*unstructured.Unstructured, applyAction, error) {
manifestRef := klog.ObjectRef{
Name: manifestObj.GetName(),
Namespace: manifestObj.GetNamespace(),
}
// compute the hash without taking into consider the last applied annotation
if err := setManifestHashAnnotation(manifestObj); err != nil {
return nil, false, err
return nil, ManifestNoChangeAction, err
}

// extract the common create procedure to reuse
var createFunc = func() (*unstructured.Unstructured, bool, error) {
var createFunc = func() (*unstructured.Unstructured, applyAction, error) {
// record the raw manifest with the hash annotation in the manifest
if err := setModifiedConfigurationAnnotation(manifestObj); err != nil {
return nil, false, err
return nil, ManifestNoChangeAction, err
}
actual, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Create(
ctx, manifestObj, metav1.CreateOptions{FieldManager: workFieldManagerName})
if err == nil {
klog.V(2).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef)
return actual, true, nil
return actual, ManifestCreatedAction, nil
}
return nil, false, err
return nil, ManifestNoChangeAction, err
}

// support resources with generated name
Expand All @@ -342,27 +355,27 @@ func (r *ApplyWorkReconciler) applyUnstructured(ctx context.Context, gvr schema.
case apierrors.IsNotFound(err):
return createFunc()
case err != nil:
return nil, false, err
return nil, ManifestNoChangeAction, err
}

// check if the existing manifest is managed by the work
if !isManifestManagedByWork(curObj.GetOwnerReferences()) {
err = fmt.Errorf("resource is not managed by the work controller")
klog.ErrorS(err, "skip applying a not managed manifest", "gvr", gvr, "obj", manifestRef)
return nil, false, err
return nil, ManifestNoChangeAction, err
}

// We only try to update the object if its spec hash value has changed.
if manifestObj.GetAnnotations()[manifestHashAnnotation] != curObj.GetAnnotations()[manifestHashAnnotation] {
return r.patchCurrentResource(ctx, gvr, manifestObj, curObj)
}

return curObj, false, nil
return curObj, ManifestNoChangeAction, nil
}

// patchCurrentResource uses three way merge to patch the current resource with the new manifest we get from the work.
func (r *ApplyWorkReconciler) patchCurrentResource(ctx context.Context, gvr schema.GroupVersionResource,
manifestObj, curObj *unstructured.Unstructured) (*unstructured.Unstructured, bool, error) {
manifestObj, curObj *unstructured.Unstructured) (*unstructured.Unstructured, applyAction, error) {
manifestRef := klog.ObjectRef{
Name: manifestObj.GetName(),
Namespace: manifestObj.GetNamespace(),
Expand All @@ -376,28 +389,28 @@ func (r *ApplyWorkReconciler) patchCurrentResource(ctx context.Context, gvr sche
manifestObj.SetOwnerReferences(mergeOwnerReference(curObj.GetOwnerReferences(), manifestObj.GetOwnerReferences()))
// record the raw manifest with the hash annotation in the manifest
if err := setModifiedConfigurationAnnotation(manifestObj); err != nil {
return nil, false, err
return nil, ManifestNoChangeAction, err
}
// create the three-way merge patch between the current, original and manifest similar to how kubectl apply does
patch, err := threeWayMergePatch(curObj, manifestObj)
if err != nil {
klog.ErrorS(err, "failed to generate the three way patch", "gvr", gvr, "manifest", manifestRef)
return nil, false, err
return nil, ManifestNoChangeAction, err
}
data, err := patch.Data(manifestObj)
if err != nil {
klog.ErrorS(err, "failed to generate the three way patch", "gvr", gvr, "manifest", manifestRef)
return nil, false, err
return nil, ManifestNoChangeAction, err
}
// Use client side apply the patch to the member cluster
manifestObj, patchErr := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).
Patch(ctx, manifestObj.GetName(), patch.Type(), data, metav1.PatchOptions{FieldManager: workFieldManagerName})
if patchErr != nil {
klog.ErrorS(patchErr, "failed to patch the manifest", "gvr", gvr, "manifest", manifestRef)
return nil, false, patchErr
return nil, ManifestNoChangeAction, patchErr
}
klog.V(2).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef)
return manifestObj, true, nil
return manifestObj, ManifestUpdatedAction, nil
}

// generateWorkCondition constructs the work condition based on the apply result
Expand All @@ -409,7 +422,7 @@ func (r *ApplyWorkReconciler) generateWorkCondition(results []applyResult, work
if result.err != nil {
errs = append(errs, result.err)
}
appliedCondition := buildManifestAppliedCondition(result.err, result.updated, result.generation)
appliedCondition := buildManifestAppliedCondition(result.err, result.action, result.generation)
manifestCondition := workv1alpha1.ManifestCondition{
Identifier: result.identifier,
Conditions: []metav1.Condition{appliedCondition},
Expand Down Expand Up @@ -577,7 +590,7 @@ func buildResourceIdentifier(index int, object *unstructured.Unstructured, gvr s
}
}

func buildManifestAppliedCondition(err error, updated bool, observedGeneration int64) metav1.Condition {
func buildManifestAppliedCondition(err error, action applyAction, observedGeneration int64) metav1.Condition {
if err != nil {
return metav1.Condition{
Type: ConditionTypeApplied,
Expand All @@ -589,27 +602,17 @@ func buildManifestAppliedCondition(err error, updated bool, observedGeneration i
}
}

if updated {
return metav1.Condition{
Type: ConditionTypeApplied,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
ObservedGeneration: observedGeneration,
Reason: "appliedManifestUpdated",
Message: "appliedManifest updated",
}
}
return metav1.Condition{
Type: ConditionTypeApplied,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
ObservedGeneration: observedGeneration,
Reason: "appliedManifestComplete",
Message: "Apply manifest complete",
Reason: string(action),
Message: string(action),
}
}

// generateWorkAppliedCondition generate appied status condition for work.
// generateWorkAppliedCondition generate applied status condition for work.
// If one of the manifests is applied failed on the spoke, the applied status condition of the work is false.
func generateWorkAppliedCondition(manifestConditions []workv1alpha1.ManifestCondition, observedGeneration int64) metav1.Condition {
for _, manifestCond := range manifestConditions {
Expand Down
49 changes: 25 additions & 24 deletions pkg/controllers/work/apply_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func TestApplyUnstructured(t *testing.T) {
reconciler ApplyWorkReconciler
workObj *unstructured.Unstructured
resultSpecHash string
resultBool bool
resultAction applyAction
resultErr error
}{
"test creation succeeds when the object does not exist": {
Expand All @@ -363,7 +363,7 @@ func TestApplyUnstructured(t *testing.T) {
},
workObj: correctObj.DeepCopy(),
resultSpecHash: correctSpecHash,
resultBool: true,
resultAction: ManifestCreatedAction,
resultErr: nil,
},
"test creation succeeds when the object has a generated name": {
Expand All @@ -376,7 +376,7 @@ func TestApplyUnstructured(t *testing.T) {
},
workObj: generatedSpecObj.DeepCopy(),
resultSpecHash: generatedSpecHash,
resultBool: true,
resultAction: ManifestCreatedAction,
resultErr: nil,
},
"client error looking for object / fail": {
Expand All @@ -387,9 +387,9 @@ func TestApplyUnstructured(t *testing.T) {
restMapper: testMapper{},
recorder: utils.NewFakeRecorder(1),
},
workObj: correctObj.DeepCopy(),
resultBool: false,
resultErr: errors.New("client error"),
workObj: correctObj.DeepCopy(),
resultAction: ManifestNoChangeAction,
resultErr: errors.New("client error"),
},
"owner reference comparison failure / fail": {
reconciler: ApplyWorkReconciler{
Expand All @@ -399,9 +399,9 @@ func TestApplyUnstructured(t *testing.T) {
restMapper: testMapper{},
recorder: utils.NewFakeRecorder(1),
},
workObj: correctObj.DeepCopy(),
resultBool: false,
resultErr: errors.New("resource is not managed by the work controller"),
workObj: correctObj.DeepCopy(),
resultAction: ManifestNoChangeAction,
resultErr: errors.New("resource is not managed by the work controller"),
},
"equal spec hash of current vs work object / succeed without updates": {
reconciler: ApplyWorkReconciler{
Expand All @@ -410,17 +410,17 @@ func TestApplyUnstructured(t *testing.T) {
},
workObj: correctObj.DeepCopy(),
resultSpecHash: correctSpecHash,
resultBool: false,
resultAction: ManifestNoChangeAction,
resultErr: nil,
},
"unequal spec hash of current vs work object / client patch fail": {
reconciler: ApplyWorkReconciler{
spokeDynamicClient: patchFailClient,
recorder: utils.NewFakeRecorder(1),
},
workObj: correctObj.DeepCopy(),
resultBool: false,
resultErr: errors.New("patch failed"),
workObj: correctObj.DeepCopy(),
resultAction: ManifestNoChangeAction,
resultErr: errors.New("patch failed"),
},
"happy path - with updates": {
reconciler: ApplyWorkReconciler{
Expand All @@ -430,15 +430,15 @@ func TestApplyUnstructured(t *testing.T) {
},
workObj: correctObj,
resultSpecHash: diffSpecHash,
resultBool: true,
resultAction: ManifestUpdatedAction,
resultErr: nil,
},
}

for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
applyResult, applyResultBool, err := testCase.reconciler.applyUnstructured(context.Background(), testGvr, testCase.workObj)
assert.Equalf(t, testCase.resultBool, applyResultBool, "updated boolean not matching for Testcase %s", testName)
applyResult, applyAction, err := testCase.reconciler.applyUnstructured(context.Background(), testGvr, testCase.workObj)
assert.Equalf(t, testCase.resultAction, applyAction, "updated boolean not matching for Testcase %s", testName)
if testCase.resultErr != nil {
assert.Containsf(t, err.Error(), testCase.resultErr.Error(), "error not matching for Testcase %s", testName)
} else {
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestApplyManifest(t *testing.T) {
reconciler ApplyWorkReconciler
manifestList []workv1alpha1.Manifest
generation int64
updated bool
action applyAction
wantGvr schema.GroupVersionResource
wantErr error
}{
Expand All @@ -501,9 +501,9 @@ func TestApplyManifest(t *testing.T) {
recorder: utils.NewFakeRecorder(1),
joined: atomic.NewBool(true),
},
manifestList: append([]workv1alpha1.Manifest{}, testManifest),
manifestList: []workv1alpha1.Manifest{testManifest},
generation: 0,
updated: true,
action: ManifestCreatedAction,
wantGvr: expectedGvr,
wantErr: nil,
},
Expand All @@ -518,7 +518,7 @@ func TestApplyManifest(t *testing.T) {
},
manifestList: append([]workv1alpha1.Manifest{}, InvalidManifest),
generation: 0,
updated: false,
action: ManifestNoChangeAction,
wantGvr: emptyGvr,
wantErr: &json.UnmarshalTypeError{
Value: "string",
Expand All @@ -536,7 +536,7 @@ func TestApplyManifest(t *testing.T) {
},
manifestList: append([]workv1alpha1.Manifest{}, MissingManifest),
generation: 0,
updated: false,
action: ManifestNoChangeAction,
wantGvr: emptyGvr,
wantErr: errors.New("failed to find group/version/resource from restmapping: test error: mapping does not exist"),
},
Expand All @@ -551,7 +551,7 @@ func TestApplyManifest(t *testing.T) {
},
manifestList: append([]workv1alpha1.Manifest{}, testManifest),
generation: 0,
updated: false,
action: ManifestNoChangeAction,
wantGvr: expectedGvr,
wantErr: errors.New(failMsg),
},
Expand All @@ -563,9 +563,10 @@ func TestApplyManifest(t *testing.T) {
for _, result := range resultList {
if testCase.wantErr != nil {
assert.Containsf(t, result.err.Error(), testCase.wantErr.Error(), "Incorrect error for Testcase %s", testName)
} else {
assert.Equalf(t, testCase.generation, result.generation, "Testcase %s: generation incorrect", testName)
assert.Equalf(t, testCase.action, result.action, "Testcase %s: Updated action incorrect", testName)
}
assert.Equalf(t, testCase.generation, result.generation, "Testcase %s: generation incorrect", testName)
assert.Equalf(t, testCase.updated, result.updated, "Testcase %s: Updated boolean incorrect", testName)
}
})
}
Expand Down
Loading