Skip to content
16 changes: 9 additions & 7 deletions api/v1/composition.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
)

const (
enoAzureOperationIDKey = "eno.azure.io/operationID"
enoAzureOperationOrigin = "eno.azure.io/operationOrigin"
OperationIdKey string = "operationID"
OperationOrigionKey string = "operationOrigin"
CircularDependencyReason string = "CircularDependency"
WaitingOnDependentsReason string = "WaitingOnDependents"
WaitingOnDependenciesReason string = "WaitingOnDependencies"
enoAzureOperationIDKey = "eno.azure.io/operationID"
enoAzureOperationOrigin = "eno.azure.io/operationOrigin"
OperationIdKey string = "operationID"
OperationOrigionKey string = "operationOrigin"
CircularDependencyReason string = "CircularDependency"
WaitingOnDependentsReason string = "WaitingOnDependents"
WaitingOnDependenciesReason string = "WaitingOnDependencies"
WaitingOnDependencyNotFoundReason string = "DependencyNotFound"
WaitingOnDependencyNotReadyReason string = "DependencyNotReady"
)

// +kubebuilder:object:root=true
Expand Down
5 changes: 5 additions & 0 deletions internal/controllers/composition/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ func buildSimplifiedStatus(synth *apiv1.Synthesizer, comp *apiv1.Composition) *a
}

if comp.Status.CurrentSynthesis == nil && comp.Status.InFlightSynthesis == nil {
// This means that dependencies exists and no synthesis started showing WaitingOnDependencies
if len(comp.Spec.DependsOn) > 0 {
status.Status = apiv1.WaitingOnDependenciesReason
return status
}
status.Status = "PendingSynthesis"
return status
}
Expand Down
12 changes: 11 additions & 1 deletion internal/controllers/composition/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ func TestSimplifiedStatus(t *testing.T) {
state.Comp.Status.Simplified = &apiv1.SimplifiedStatus{Error: "Previous reconciliation error"}
return state
}).
WithMutation("with dependencies", func(state *simplifiedStatusState) *simplifiedStatusState {
state.Comp.Spec.DependsOn = []apiv1.CompositionDependency{{Name: "dep-a", Namespace: "default"}}
return state
}).
WithInvariant("missing synth", func(state *simplifiedStatusState, result *apiv1.SimplifiedStatus) bool {
return state.Comp.DeletionTimestamp != nil || state.Synth != nil || result.Status == "MissingSynthesizer"
}).
Expand Down Expand Up @@ -261,6 +265,13 @@ func TestSimplifiedStatus(t *testing.T) {
state.Comp.Status.Simplified.Error == "" ||
result.Error == state.Comp.Status.Simplified.Error
}).
WithInvariant("waiting on dependencies", func(state *simplifiedStatusState, result *apiv1.SimplifiedStatus) bool {
hasDeps := len(state.Comp.Spec.DependsOn) > 0
noSynthesis := state.Comp.Status.CurrentSynthesis == nil && state.Comp.Status.InFlightSynthesis == nil
notDeleting := state.Comp.DeletionTimestamp == nil
hasSynth := state.Synth != nil
return !(hasDeps && noSynthesis && notDeleting && hasSynth) || result.Status == apiv1.WaitingOnDependenciesReason
}).
Evaluate(t)
}

Expand Down Expand Up @@ -300,7 +311,6 @@ func TestIsAddonComposition(t *testing.T) {
labels: map[string]string{AKSComponentLabel: addOnLabelValue},
expected: true,
},

}

for _, tt := range tests {
Expand Down
113 changes: 112 additions & 1 deletion internal/controllers/scheduling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"encoding/json"
"fmt"
"hash/fnv"
"path"
"sort"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -114,9 +116,42 @@ func (c *controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Reset the compositionHealth metric before iterating through compositions
compositionHealth.Reset()

// Build readiness index and composition map for dependency checking
readySet := buildReadySet(comps)
existSet := buildExistsSet(comps)
sortedComps, cyclicSet := topoSortCompositions(comps.Items)

// Pre-loop: set/clear circular dependency status on all compositions.
// Cyclic compositions are excluded from sortedComps by Kahn's algorithm,
// so we must handle them here before the main loop.
for _, comp := range comps.Items {
comp := comp
if comp.DeletionTimestamp != nil || len(comp.Spec.DependsOn) == 0 {
continue
}
compKey := path.Join(comp.GetNamespace(), comp.GetName())
if cyclicSet[compKey] {
logger.Error(fmt.Errorf("circular dependency detected"),
"skipping composition synthesis",
"compositionName", comp.GetName(), "compositionNamespace", comp.GetNamespace())
if _, err := c.setDependencyStatus(ctx, &comp, &apiv1.DependencyStatus{
Blocked: true,
Reason: apiv1.CircularDependencyReason,
}); err != nil {
logger.Error(err, "failed to update circular dependency status")
return ctrl.Result{}, err
}
} else if cleared, err := c.clearDependencyStatus(ctx, &comp, apiv1.CircularDependencyReason); err != nil {
logger.Error(err, "failed to clear circular dependency status")
return ctrl.Result{}, err
} else if cleared {
logger.Info("cleared stale circular dependency status", "compositionName", comp.GetName())
}
}

var inFlight int
var op *op
for _, comp := range comps.Items {
for _, comp := range sortedComps {
comp := comp
if comp.Synthesizing() {
inFlight++
Expand All @@ -131,6 +166,46 @@ func (c *controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
compositionHealth.WithLabelValues(comp.Name, comp.Namespace, comp.Spec.Synthesizer.Name).Set(0)
}

if comp.DeletionTimestamp == nil && len(comp.Spec.DependsOn) > 0 {
if !areDependenciesReady(&comp, readySet) {
// Build BlockedBy list
var blockedBy []apiv1.BlockedByRef
for _, dep := range comp.Spec.DependsOn {
key := path.Join(dep.Namespace, dep.Name)
if !readySet[key] {
reason := apiv1.WaitingOnDependencyNotReadyReason
if !existSet[key] {
reason = apiv1.WaitingOnDependencyNotFoundReason
}
logger.Info("dependency not satisfied for composition", "compositionName", comp.GetName(), "compositionNamespace", comp.GetNamespace(),
"dependencyName", dep.Name, "dependencyNamespace", dep.Namespace, "reason", reason)
blockedBy = append(blockedBy, apiv1.BlockedByRef{
Name: dep.Name,
Namespace: dep.Namespace,
Reason: reason,
})
}
}
if _, err := c.setDependencyStatus(ctx, &comp, &apiv1.DependencyStatus{
Blocked: true,
Reason: apiv1.WaitingOnDependenciesReason,
BlockedBy: blockedBy,
}); err != nil {
return ctrl.Result{}, err
}
logger.Info("not all dependent compositions are ready, skipping composition synthesis", "compositionName", comp.GetName(), "compositionNamespace", comp.GetNamespace())
continue
}

// clear the waitingonDependencies status
if cleared, err := c.clearDependencyStatus(ctx, &comp, apiv1.WaitingOnDependenciesReason); err != nil {
logger.Error(err, "failed to clear dependency status")
return ctrl.Result{}, err
} else if cleared {
logger.Info("cleared stale waiting on dependency status", "compositionName", comp.GetName(), "compositionNamespace", comp.GetNamespace())
}
}

synth, ok := synthsByName[comp.Spec.Synthesizer.Name]
if !ok {
continue
Expand Down Expand Up @@ -242,3 +317,39 @@ func getSynthOwner(synth *apiv1.Synthesizer) string {
}
return synth.GetAnnotations()["eno.azure.io/owner"]
}

// setDependencyStatus patches the composition's DependencyStatus if it doesn't already
// match the given reason. Returns true if a patch was applied
func (c *controller) setDependencyStatus(ctx context.Context, comp *apiv1.Composition, newStatus *apiv1.DependencyStatus) (bool, error) {
logger := logr.FromContextOrDiscard(ctx)
// same status, we can ignore
if comp.Status.DependencyStatus != nil && equality.Semantic.DeepEqual(*comp.Status.DependencyStatus, *newStatus) {
return false, nil // modified=false, err = nil
}

copy := comp.DeepCopy()
copy.Status.DependencyStatus = newStatus
logger.Info("Setting DependencyStatus for composition", "compositionName", comp.GetName(), "compositionNamespace", comp.GetNamespace(),
"DependencyStatusReason", newStatus.Reason, "BlockedBy", newStatus.BlockedBy)
if err := c.client.Status().Patch(ctx, copy, client.MergeFrom(comp)); err != nil {
return false, err
}
return true, nil
}

// clearDependencyStatus clears the composition's DependencyStatus if it currently has the given reason.
// Returns true if a patch was applied
func (c *controller) clearDependencyStatus(ctx context.Context, comp *apiv1.Composition, reason string) (bool, error) {
logger := logr.FromContextOrDiscard(ctx)
if comp.Status.DependencyStatus == nil || comp.Status.DependencyStatus.Reason != reason {
return false, nil
}
copy := comp.DeepCopy()
copy.Status.DependencyStatus = nil
logger.Info("clearing dependency status", "compositionName", comp.GetName(), "compositionNamespace", comp.GetNamespace(),
"prevDependencyReason", comp.Status.DependencyStatus.Reason)
if err := c.client.Status().Patch(ctx, copy, client.MergeFrom(comp)); err != nil {
return false, err
}
return true, nil
}
Loading
Loading