Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
REGISTRY ?= ghcr.io
KIND_IMAGE ?= kindest/node:v1.23.3
KIND_IMAGE ?= kindest/node:v1.24.6
ifndef TAG
TAG ?= $(shell git rev-parse --short=7 HEAD)
endif
Expand Down
2 changes: 1 addition & 1 deletion cmd/hubagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func init() {
//+kubebuilder:scaffold:scheme
klog.InitFlags(nil)

metrics.Registry.MustRegister(fleetmetrics.JoinResultMetrics, fleetmetrics.LeaveResultMetrics)
metrics.Registry.MustRegister(fleetmetrics.JoinResultMetrics, fleetmetrics.LeaveResultMetrics, fleetmetrics.PlacementApplyFailedCount, fleetmetrics.PlacementApplySucceedCount)
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func init() {
utilruntime.Must(workv1alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme

metrics.Registry.MustRegister(fleetmetrics.JoinResultMetrics, fleetmetrics.LeaveResultMetrics)
metrics.Registry.MustRegister(fleetmetrics.JoinResultMetrics, fleetmetrics.LeaveResultMetrics, fleetmetrics.WorkApplyTime)
}

func main() {
Expand Down
27 changes: 18 additions & 9 deletions pkg/controllers/clusterresourceplacement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informer"
Expand Down Expand Up @@ -59,6 +60,7 @@ type Reconciler struct {
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
startTime := time.Now()
name, ok := key.(string)
if !ok {
err := fmt.Errorf("get place key %+v not of type string", key)
Expand All @@ -73,9 +75,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
}
placeRef := klog.KObj(placementOld)
placementNew := placementOld.DeepCopy()
// add latency log
defer func() {
klog.V(2).InfoS("ClusterResourcePlacement reconciliation loop ends", "placement", placeRef, "latency", time.Since(startTime).Milliseconds())
}()

// TODO: add finalizer logic if we need it in the future

klog.V(2).InfoS("Start to reconcile a ClusterResourcePlacement", "placement", placeRef)
// select the new clusters and record that in the placementNew status
selectedClusters, scheduleErr := r.selectClusters(placementNew)
Expand Down Expand Up @@ -120,7 +125,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
}
klog.V(2).InfoS("Successfully persisted the intermediate scheduling result", "placement", placementOld.Name,
"totalClusters", totalCluster, "totalResources", totalResources)
// pick up the new version so placementNew can continue to update
// pick up the newly updated schedule condition so that the last schedule time will change every time we run the reconcile loop
meta.SetStatusCondition(&placementNew.Status.Conditions, *placementOld.GetCondition(string(fleetv1alpha1.ResourcePlacementConditionTypeScheduled)))
// pick up the new version so that we can update placementNew without getting it again
placementNew.SetResourceVersion(placementOld.GetResourceVersion())

// schedule works for each cluster by placing them in the cluster scoped namespace
Expand Down Expand Up @@ -254,17 +261,17 @@ func (r *Reconciler) updatePlacementScheduledCondition(placement *fleetv1alpha1.
placementRef := klog.KObj(placement)
schedCond := placement.GetCondition(string(fleetv1alpha1.ResourcePlacementConditionTypeScheduled))
if scheduleErr == nil {
if schedCond == nil || schedCond.Status != metav1.ConditionTrue {
klog.V(2).InfoS("successfully scheduled all selected resources to their clusters", "placement", placementRef)
r.Recorder.Event(placement, corev1.EventTypeNormal, "ResourceScheduled", "successfully scheduled all selected resources to their clusters")
}
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionTrue,
Type: string(fleetv1alpha1.ResourcePlacementConditionTypeScheduled),
Reason: "ScheduleSucceeded",
Message: "Successfully scheduled resources for placement",
ObservedGeneration: placement.Generation,
})
if schedCond == nil || schedCond.Status != metav1.ConditionTrue {
klog.V(2).InfoS("successfully scheduled all selected resources to their clusters", "placement", placementRef)
r.Recorder.Event(placement, corev1.EventTypeNormal, "ResourceScheduled", "successfully scheduled all selected resources to their clusters")
}
} else {
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionFalse,
Expand Down Expand Up @@ -293,8 +300,9 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl
Message: "Successfully applied resources to member clusters",
ObservedGeneration: placement.Generation,
})
klog.V(2).InfoS("successfully applied all selected resources", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status != metav1.ConditionTrue {
klog.V(2).InfoS("successfully applied all selected resources", "placement", placementRef)
metrics.PlacementApplySucceedCount.WithLabelValues(placement.GetName()).Inc()
r.Recorder.Event(placement, corev1.EventTypeNormal, "ResourceApplied", "successfully applied all selected resources")
}
case errors.Is(applyErr, ErrStillPendingManifest):
Expand All @@ -305,8 +313,8 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl
Message: applyErr.Error(),
ObservedGeneration: placement.Generation,
})
klog.V(2).InfoS("Some selected resources are still waiting to be applied", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status == metav1.ConditionTrue {
klog.V(2).InfoS("Some selected resources are still waiting to be applied", "placement", placementRef)
r.Recorder.Event(placement, corev1.EventTypeWarning, "ResourceApplyPending", "Some applied resources are now waiting to be applied to the member cluster")
}
default:
Expand All @@ -318,8 +326,9 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl
Message: applyErr.Error(),
ObservedGeneration: placement.Generation,
})
klog.V(2).InfoS("failed to apply some selected resources", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status != metav1.ConditionFalse {
klog.V(2).InfoS("failed to apply some selected resources", "placement", placementRef)
metrics.PlacementApplyFailedCount.WithLabelValues(placement.GetName()).Inc()
r.Recorder.Event(placement, corev1.EventTypeWarning, "ResourceApplyFailed", "failed to apply some selected resources")
}
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/controllers/clusterresourceplacement/work_propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (
)

const (
LastUpdateAnnotationKey = "work.fleet.azure.com/last-update-time"
SpecHashAnnotationKey = "work.fleet.azure.com/spec-hash-value"
specHashAnnotationKey = "work.fleet.azure.com/spec-hash-value"
)

// scheduleWork creates or updates the work object to reflect the new placement decision.
Expand Down Expand Up @@ -63,8 +62,8 @@ func (r *Reconciler) scheduleWork(ctx context.Context, placement *fleetv1alpha1.
utils.LabelFleetObj: utils.LabelFleetObjValue,
}
workAnnotation := map[string]string{
LastUpdateAnnotationKey: time.Now().Format(time.RFC3339),
SpecHashAnnotationKey: specHash,
utils.LastWorkUpdateTimeAnnotationKey: time.Now().Format(time.RFC3339),
specHashAnnotationKey: specHash,
}
changed := false
for _, memberClusterName := range memberClusterNames {
Expand Down Expand Up @@ -98,7 +97,7 @@ func (r *Reconciler) scheduleWork(ctx context.Context, placement *fleetv1alpha1.
changed = true
continue
}
existingHash := curWork.GetAnnotations()[SpecHashAnnotationKey]
existingHash := curWork.GetAnnotations()[specHashAnnotationKey]
if existingHash == specHash || reflect.DeepEqual(curWork.Spec.Workload.Manifests, workerSpec.Workload.Manifests) {
klog.V(2).InfoS("skip updating work spec as its identical",
"member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests))
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/membercluster/membercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func markMemberClusterLeft(recorder record.EventRecorder, mc apis.ConditionedObj
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeNormal, reasonMemberClusterJoined, "member cluster left")
klog.V(2).InfoS("memberCluster left", "memberCluster", klog.KObj(mc))
metrics.ReportJoinResultMetric()
metrics.ReportLeaveResultMetric()
}

mc.SetConditions(newCondition, notReadyCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package memberclusterplacement
import (
"context"
"fmt"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -34,20 +35,27 @@ type Reconciler struct {
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
startTime := time.Now()
memberClusterName, ok := key.(string)
if !ok {
err := fmt.Errorf("got a resource key %+v not of type cluster wide key", key)
klog.ErrorS(err, "we have encountered a fatal error that can't be retried")
return ctrl.Result{}, err
}

// add latency log
defer func() {
klog.V(2).InfoS("MemberClusterPlacement reconciliation loop ends", "memberCluster", memberClusterName, "latency", time.Since(startTime).Milliseconds())
}()

klog.V(2).InfoS("Start to reconcile a member cluster to enqueue placement events", "memberCluster", memberClusterName)
mObj, err := r.InformerManager.Lister(utils.MemberClusterGVR).Get(memberClusterName)
if err != nil {
klog.ErrorS(err, "failed to get the member cluster", "memberCluster", memberClusterName)
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
mObj = nil //guard against unexpected informer lib behavior
}
crpList, err := r.InformerManager.Lister(utils.ClusterResourcePlacementGVR).List(labels.Everything())
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/resourcechange/resourcechange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package resourcechange
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -50,6 +51,7 @@ type Reconciler struct {
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
startTime := time.Now()
clusterWideKey, ok := key.(keys.ClusterWideKey)
if !ok {
err := fmt.Errorf("got a resource key %+v not of type cluster wide key", key)
Expand All @@ -58,6 +60,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
}
klog.V(2).InfoS("Reconciling object", "obj", clusterWideKey)

// add latency log
defer func() {
klog.V(2).InfoS("ResourceChange reconciliation loop ends", "obj", clusterWideKey, "latency", time.Since(startTime).Milliseconds())
}()

// the clusterObj is set to be the object that the placement direct selects,
// in the case of a deleted namespace scoped object, the clusterObj is set to be its parent namespace object.
clusterObj, isClusterScoped, err := r.getUnstructuredObject(clusterWideKey)
Expand Down
21 changes: 19 additions & 2 deletions pkg/controllers/work/apply_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"

"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
)

const (
Expand Down Expand Up @@ -139,9 +141,24 @@ func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
BlockOwnerDeletion: pointer.Bool(false),
}

// Apply the manifests to the member cluster
// apply the manifests to the member cluster
results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner)

// collect the latency from the work update time to now.
lastUpdateTime, ok := work.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey]
if ok {
workUpdateTime, parseErr := time.Parse(time.RFC3339, lastUpdateTime)
if parseErr != nil {
klog.ErrorS(parseErr, "failed to parse the last work update time", "work", logObjRef)
} else {
latency := time.Since(workUpdateTime)
metrics.WorkApplyTime.WithLabelValues(work.GetName()).Observe(latency.Seconds())
klog.V(2).InfoS("work is applied", "work", work.GetName(), "latency", latency.Milliseconds())
}
} else {
klog.V(2).InfoS("work has no last update time", "work", work.GetName())
}

// generate the work condition based on the manifest apply result
errs := r.generateWorkCondition(results, work)

Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ var (
Name: "leave_result_counter",
Help: "Number of successful Leave operations",
}, []string{"result"})
WorkApplyTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "work_apply_time_seconds",
Help: "Length of time between when a work resource is created/updated to when it is applied on the member cluster",
Buckets: []float64{0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.7, 0.9, 1.0,
1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 7, 9, 10, 15, 20, 30, 60, 120},
}, []string{"name"})
PlacementApplyFailedCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "placement_apply_failed_counter",
Help: "Number of failed to apply cluster resource placement",
}, []string{"name"})
PlacementApplySucceedCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "placement_apply_succeed_counter",
Help: "Number of successfully applied cluster resource placement",
}, []string{"name"})
)

var (
Expand Down
5 changes: 4 additions & 1 deletion pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ const (
// This label aims to enable different work objects to be managed by different placement.
LabelWorkPlacementName = "work.fleet.azure.com/placement-name"

// MemberClusterFinalizer is used to make sure that we handle gc of all the member cluster resources on the hub cluster
// MemberClusterFinalizer is used to make sure that we handle gc of all the member cluster resources on the hub cluster.
MemberClusterFinalizer = "work.fleet.azure.com/membercluster-finalizer"

// LastWorkUpdateTimeAnnotationKey is used to mark the last update time on a work object.
LastWorkUpdateTimeAnnotationKey = "work.fleet.azure.com/last-update-time"
)

var (
Expand Down
15 changes: 15 additions & 0 deletions test/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,28 @@ kubectl --context=kind-hub-testing apply -f test/integration/manifests/placement
check the logs of the hub cluster controller
```shell
kubectl --context=kind-hub-testing -n fleet-system get pod

NAME READY STATUS RESTARTS AGE
hub-agent-8bb6d658-6jj7n 1/1 Running 0 11m

```

check the logs of the member cluster controller
```shell
kubectl --context=kind-member-testing -n fleet-system get pod
```

check the hub metrics
```shell
kubectl --context=kind-hub-testing -n fleet-system port-forward hub-agent-xxxx-xxx 13622:8080

Forwarding from 127.0.0.1:13622 -> 8080
Forwarding from [::1]:13622 -> 8080

curl http://127.0.0.1:13622/metrics
```


5.uninstall the resources
```shell
make uninstall-helm
Expand Down
3 changes: 1 addition & 2 deletions test/integration/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/controllers/clusterresourceplacement"
"go.goms.io/fleet/pkg/utils"
)

Expand Down Expand Up @@ -283,7 +282,7 @@ func verifyPartialWorkObjects(crp *fleetv1alpha1.ClusterResourcePlacement, expec
}
}
}
lastUpdateTime, err := time.Parse(time.RFC3339, clusterWork.GetAnnotations()[clusterresourceplacement.LastUpdateAnnotationKey])
lastUpdateTime, err := time.Parse(time.RFC3339, clusterWork.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey])
Expect(err).Should(Succeed())
return lastUpdateTime
}
Expand Down