Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestInstallCmd(t *testing.T) {

mockKubernetesManager := kubernetes.NewMockKubernetesManager()
mockKubernetesManager.ApplyBlueprintFunc = func(blueprint *blueprintv1alpha1.Blueprint, namespace string) error { return nil }
mockKubernetesManager.WaitForKustomizationsFunc = func(message string, names ...string) error { return nil }
mockKubernetesManager.WaitForKustomizationsFunc = func(message string, blueprint *blueprintv1alpha1.Blueprint) error { return nil }

// Override ConfigHandler and ProjectRoot in runtime
mocks.Runtime.ConfigHandler = mockConfigHandler
Expand Down
4 changes: 2 additions & 2 deletions cmd/up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func setupUpTest(t *testing.T, opts ...*SetupOptions) *UpMocks {
// Add kubernetes manager mock
mockKubernetesManager := kubernetes.NewMockKubernetesManager()
mockKubernetesManager.ApplyBlueprintFunc = func(blueprint *blueprintv1alpha1.Blueprint, namespace string) error { return nil }
mockKubernetesManager.WaitForKustomizationsFunc = func(message string, names ...string) error { return nil }
mockKubernetesManager.WaitForKustomizationsFunc = func(message string, blueprint *blueprintv1alpha1.Blueprint) error { return nil }

// Create runtime with all mocked dependencies
rt, err := runtime.NewRuntime(&runtime.Runtime{
Expand Down Expand Up @@ -407,7 +407,7 @@ func TestUpCmd(t *testing.T) {
mocks := setupUpTest(t)

// And kubernetes manager WaitForKustomizations that fails
mocks.KubernetesManager.WaitForKustomizationsFunc = func(message string, names ...string) error {
mocks.KubernetesManager.WaitForKustomizationsFunc = func(message string, blueprint *blueprintv1alpha1.Blueprint) error {
return fmt.Errorf("wait for kustomizations failed")
}

Expand Down
85 changes: 79 additions & 6 deletions pkg/provisioner/kubernetes/kubernetes_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
type KubernetesManager interface {
ApplyKustomization(kustomization kustomizev1.Kustomization) error
DeleteKustomization(name, namespace string) error
WaitForKustomizations(message string, names ...string) error
WaitForKustomizations(message string, blueprint *blueprintv1alpha1.Blueprint) error
CreateNamespace(name string) error
DeleteNamespace(name string) error
ApplyConfigMap(name, namespace string, data map[string]string) error
Expand Down Expand Up @@ -153,26 +153,38 @@ func (k *BaseKubernetesManager) DeleteKustomization(name, namespace string) erro
return fmt.Errorf("timeout waiting for kustomization %s to be deleted", name)
}

// WaitForKustomizations waits for kustomizations to be ready
func (k *BaseKubernetesManager) WaitForKustomizations(message string, names ...string) error {
// WaitForKustomizations waits for kustomizations to be ready, calculating the timeout
// from the longest dependency chain in the blueprint. Outputs a debug message describing
// the total wait timeout being used before beginning polling.
func (k *BaseKubernetesManager) WaitForKustomizations(message string, blueprint *blueprintv1alpha1.Blueprint) error {
if blueprint == nil {
return fmt.Errorf("blueprint not provided")
}

timeout := k.calculateTotalWaitTime(blueprint)
kustomizationNames := make([]string, len(blueprint.Kustomizations))
for i, kustomization := range blueprint.Kustomizations {
kustomizationNames[i] = kustomization.Name
}

spin := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithColor("green"))
spin.Suffix = " " + message
spin.Start()
defer spin.Stop()

timeout := time.After(k.kustomizationReconcileTimeout)
timeoutChan := time.After(timeout)
ticker := time.NewTicker(k.kustomizationWaitPollInterval)
defer ticker.Stop()

for {
select {
case <-timeout:
case <-timeoutChan:
spin.Stop()
fmt.Fprintf(os.Stderr, "✗%s - \033[31mFailed\033[0m\n", spin.Suffix)
return fmt.Errorf("timeout waiting for kustomizations")
case <-ticker.C:
allReady := true
for _, name := range names {
for _, name := range kustomizationNames {
gvr := schema.GroupVersionResource{
Group: "kustomize.toolkit.fluxcd.io",
Version: "v1",
Expand Down Expand Up @@ -1201,6 +1213,67 @@ func isImmutableConfigMap(obj *unstructured.Unstructured) bool {
return ok && immutable
}

// calculateTotalWaitTime calculates the total timeout for the longest dependency chain
// by summing the timeouts of all kustomizations along the path. It traverses the dependency graph
// to find the path with the maximum cumulative timeout. Returns the calculated timeout or the default
// if no kustomizations exist. Cycles are not detected and may cause stack overflow.
func (k *BaseKubernetesManager) calculateTotalWaitTime(blueprint *blueprintv1alpha1.Blueprint) time.Duration {
if len(blueprint.Kustomizations) == 0 {
return constants.DefaultKustomizationWaitTotalTimeout
}

nameToIndex := make(map[string]int)
for i, kustomization := range blueprint.Kustomizations {
nameToIndex[kustomization.Name] = i
}

var calculateChainTimeout func(componentIndex int, visited map[int]bool) time.Duration
calculateChainTimeout = func(componentIndex int, visited map[int]bool) time.Duration {
if visited[componentIndex] {
return 0
}
visited[componentIndex] = true
defer delete(visited, componentIndex)

kustomization := blueprint.Kustomizations[componentIndex]

currentTimeout := constants.DefaultFluxKustomizationTimeout
if kustomization.Timeout != nil && kustomization.Timeout.Duration != 0 {
currentTimeout = kustomization.Timeout.Duration
}

if len(kustomization.DependsOn) == 0 {
return currentTimeout
}

maxDependencyTimeout := time.Duration(0)
for _, depName := range kustomization.DependsOn {
if depIndex, exists := nameToIndex[depName]; exists {
depTimeout := calculateChainTimeout(depIndex, visited)
if depTimeout > maxDependencyTimeout {
maxDependencyTimeout = depTimeout
}
}
}

return currentTimeout + maxDependencyTimeout
}

maxTimeout := time.Duration(0)
for i := range blueprint.Kustomizations {
timeout := calculateChainTimeout(i, make(map[int]bool))
if timeout > maxTimeout {
maxTimeout = timeout
}
}

if maxTimeout == 0 {
return constants.DefaultKustomizationWaitTotalTimeout
}

return maxTimeout
}

// isNotFoundError checks if an error is a Kubernetes resource not found error
func isNotFoundError(err error) bool {
if err == nil {
Expand Down
Loading
Loading