From 77075f401edab4033626c1fa8923de8a6090496d Mon Sep 17 00:00:00 2001 From: Max Partenfelder Date: Wed, 8 Oct 2025 14:01:21 +0200 Subject: [PATCH 1/2] fix: parent directory values files --- internal/templating/templating.go | 47 ++++++++++++++++++++++--------- internal/util/util.go | 2 +- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/internal/templating/templating.go b/internal/templating/templating.go index 7f85808..3443054 100644 --- a/internal/templating/templating.go +++ b/internal/templating/templating.go @@ -64,7 +64,14 @@ func LoadValues() error { } func filterValuesFiles(valuesFiles []string, dirLimit string) []string { + // If dirLimit points to a secret file, extract its directory + if strings.HasSuffix(dirLimit, ".gitops.secret.enc.yaml") || strings.HasSuffix(dirLimit, ".gitops.secret.enc.yml") { + dirLimit = filepath.Dir(dirLimit) + log.Debugf("Converted file-based dir limit to directory: %q", dirLimit) + } + dirLimitNormalized := normalizeDirPath(dirLimit) + log.Debugf("Filtering values files with dirLimit=%q normalized=%q", dirLimit, dirLimitNormalized) if dirLimitNormalized == "" { return valuesFiles } @@ -72,8 +79,12 @@ func filterValuesFiles(valuesFiles []string, dirLimit string) []string { filtered := make([]string, 0, len(valuesFiles)) for _, valuesFile := range valuesFiles { valuesDir := normalizeDirPath(filepath.Dir(valuesFile)) - if shouldIncludeValuesFile(valuesDir, dirLimitNormalized) { + include := shouldIncludeValuesFile(valuesDir, dirLimitNormalized) + log.Debugf("Evaluating values file %q dir=%q normalizedDir=%q include=%v", valuesFile, filepath.Dir(valuesFile), valuesDir, include) + if include { filtered = append(filtered, valuesFile) + } else { + log.Debugf("Excluding values file %q due to dir limit %q", valuesFile, dirLimitNormalized) } } return filtered @@ -124,26 +135,34 @@ func (t TemplateValues) merge() { sort.SliceStable(t, func(i, j int) bool { return len(strings.Split(t[i].Path, "/")) < len(strings.Split(t[j].Path, "/")) }) + order := make([]string, len(t)) + for idx, tv := range t { + order[idx] = tv.Path + } + log.Debugf("TemplateValues merge order: %v", order) for i, templateValue := range t { - merged := false - for j, templateValue2 := range t { - if j >= i { - break - } - - if strings.HasPrefix(templateValue.Path, templateValue2.Path) { - merged = true - templateValue.MergedValues = mergeMaps(templateValue2.MergedValues, templateValue.Values) + var bestParent *TemplateValuesPath + for j := i - 1; j >= 0; j-- { + candidate := t[j] + log.Debugf("Considering ancestor candidate %q for child %q", candidate.Path, templateValue.Path) + if strings.HasPrefix(templateValue.Path, candidate.Path) { + bestParent = candidate break } } - if !merged { + + if bestParent != nil { + log.Debugf("Merging ancestor %q into %q", bestParent.Path, templateValue.Path) + templateValue.MergedValues = mergeMaps(bestParent.MergedValues, templateValue.Values) + } else { + log.Debugf("No ancestor found for %q, using own values only", templateValue.Path) templateValue.MergedValues = templateValue.Values } } } func GetValuesForPath(path string) map[interface{}]interface{} { + log.Debugf("Resolving values for secret path %q", path) if !loaded { err := LoadValues() if err != nil { @@ -154,17 +173,19 @@ func GetValuesForPath(path string) map[interface{}]interface{} { usedPath := "" maxPathLength := 0 for _, templateValue := range templateValues { + log.Debugf("Considering values prefix %q for path %q", templateValue.Path, path) if strings.HasPrefix(path, templateValue.Path) && len(templateValue.Path) > maxPathLength { maxPathLength = len(templateValue.Path) usedPath = templateValue.Path values = templateValue.MergedValues + log.Debugf("Selecting values prefix %q for path %q", templateValue.Path, path) } } if usedPath != "" { - log.Tracef("Using values from %s for path %s", usedPath, path) + log.Debugf("Using values from %s for path %s", usedPath, path) } else { - log.Tracef("No values found for path %s", path) + log.Debugf("No values found for path %s", path) } return values diff --git a/internal/util/util.go b/internal/util/util.go index df2fa7c..eee2d8c 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -49,7 +49,7 @@ func GetSecretFiles() ([]string, error) { } if secretFileRegex.MatchString(path) { - log.Debug("Found secret file: ", path) + log.Trace("Found secret file: ", path) relativePath, err := filepath.Rel(GetRootDir(), path) if err != nil { log.Error("An error occurred while getting the relative path of the secret file") From a0d0299bccd3ec15a91da7193d915dc5a36e78e8 Mon Sep 17 00:00:00 2001 From: Max Partenfelder Date: Wed, 8 Oct 2025 14:17:10 +0200 Subject: [PATCH 2/2] feat: adding parallel processing --- cmd/gitops/main.go | 7 ++ internal/kubernetes/kubernetes.go | 103 ++++++++++++++++++++++-------- internal/plan/plan.go | 97 +++++++++++++++++++++------- internal/secret/loader.go | 57 +++++++++++++++-- 4 files changed, 208 insertions(+), 56 deletions(-) diff --git a/cmd/gitops/main.go b/cmd/gitops/main.go index db5f80f..1fe8fbd 100644 --- a/cmd/gitops/main.go +++ b/cmd/gitops/main.go @@ -69,6 +69,13 @@ func main() { Usage: "display unchanged secrets in the plan overview", EnvVars: []string{"GITOPS_SHOW_UNCHANGED"}, }, + &cli.IntFlag{ + Name: "parallelism", + Aliases: []string{"p"}, + Value: 5, + Usage: "number of parallel operations for decrypting secrets and kubernetes operations", + EnvVars: []string{"GITOPS_PARALLELISM"}, + }, }, Commands: []*cli.Command{ { diff --git a/internal/kubernetes/kubernetes.go b/internal/kubernetes/kubernetes.go index 161356f..6ee79f0 100644 --- a/internal/kubernetes/kubernetes.go +++ b/internal/kubernetes/kubernetes.go @@ -3,6 +3,7 @@ package kubernetes import ( "fmt" "strings" + "sync" "github.com/TwiN/go-color" "github.com/google/uuid" @@ -122,6 +123,11 @@ func createKubernetesPlan(c *cli.Context) (*plan.Plan, error) { Items: []plan.PlanItem{}, } + parallelism := c.Int("parallelism") + if parallelism < 1 { + parallelism = 1 + } + bar := progressbar.NewOptions(len(localSecrets), progressbar.OptionEnableColorCodes(true), progressbar.OptionShowBytes(false), @@ -132,38 +138,79 @@ func createKubernetesPlan(c *cli.Context) (*plan.Plan, error) { progressbar.OptionSetDescription("[green][Syncing local state with cluster][reset]"), ) - for _, localSecret := range localSecrets { - bar.Add(1) - // check for local secret in state - // update ID in localSecret if secret exists in state - // update hash in state if secret exists in state - stateSecret := state.GetState().GetByPath(localSecret.Path) - if stateSecret == nil { - log.Trace("Secret ", localSecret.CombinedName(), " does not exist in state") - localSecret.ID = uuid.New().String() - stateSecret = state.GetState().Add(localSecret) - } else { - log.Trace("Secret ", localSecret.CombinedName(), " exists in state. Updating") - stateSecret.Update(localSecret) - } - - planItem := plan.PlanItem{ - LocalSecret: localSecret, - } + // Create channels for parallel processing + type planItemResult struct { + item plan.PlanItem + err error + } - remoteSecret, err := k8s.GetSecret(localSecret, localSecret.Target) - if err != nil { - if k8sErrors.IsNotFound(err) { - log.Trace("Secret ", localSecret.Name, " does not exist in Kubernetes cluster") - } else { - log.Error("Failed to get secret ", localSecret.Name, " from Kubernetes cluster") - return nil, err + secretChan := make(chan *secret.Secret, len(localSecrets)) + resultChan := make(chan planItemResult, len(localSecrets)) + + // Start worker goroutines + var workerGroup sync.WaitGroup + for i := 0; i < parallelism; i++ { + workerGroup.Add(1) + go func() { + defer workerGroup.Done() + for localSecret := range secretChan { + // check for local secret in state + // update ID in localSecret if secret exists in state + // update hash in state if secret exists in state + stateSecret := state.GetState().GetByPath(localSecret.Path) + if stateSecret == nil { + log.Trace("Secret ", localSecret.CombinedName(), " does not exist in state") + localSecret.ID = uuid.New().String() + stateSecret = state.GetState().Add(localSecret) + } else { + log.Trace("Secret ", localSecret.CombinedName(), " exists in state. Updating") + stateSecret.Update(localSecret) + } + + planItem := plan.PlanItem{ + LocalSecret: localSecret, + } + + remoteSecret, err := k8s.GetSecret(localSecret, localSecret.Target) + if err != nil { + if k8sErrors.IsNotFound(err) { + log.Trace("Secret ", localSecret.Name, " does not exist in Kubernetes cluster") + } else { + log.Error("Failed to get secret ", localSecret.Name, " from Kubernetes cluster") + resultChan <- planItemResult{err: err} + return + } + } + + planItem.RemoteSecret = remoteSecret + planItem.ComputeDiff() + resultChan <- planItemResult{item: planItem, err: nil} } + }() + } + + // Send work to workers + go func() { + for _, localSecret := range localSecrets { + secretChan <- localSecret } + close(secretChan) + }() - planItem.RemoteSecret = remoteSecret - planItem.ComputeDiff() - p.AddItem(planItem) + // Wait for all workers to finish and close result channel + go func() { + workerGroup.Wait() + close(resultChan) + }() + + // Collect results + for result := range resultChan { + bar.Add(1) + if result.err != nil { + bar.Finish() + return nil, result.err + } + p.AddItem(result.item) } bar.Finish() println("") diff --git a/internal/plan/plan.go b/internal/plan/plan.go index 6d75d07..810fcea 100644 --- a/internal/plan/plan.go +++ b/internal/plan/plan.go @@ -1,10 +1,13 @@ package plan import ( + "sync" + log "github.com/sirupsen/logrus" "github.com/mxcd/gitops-cli/internal/k8s" "github.com/mxcd/gitops-cli/internal/secret" + "github.com/mxcd/gitops-cli/internal/util" ) type Plan struct { @@ -64,33 +67,83 @@ func (p *Plan) Execute() error { } func executeKubernetesPlan(p *Plan) error { + parallelism := util.GetCliContext().Int("parallelism") + if parallelism < 1 { + parallelism = 1 + } + + // Filter items that need processing + itemsToProcess := []PlanItem{} for _, item := range p.Items { - if item.Diff.Equal { + if !item.Diff.Equal { + itemsToProcess = append(itemsToProcess, item) + } else { log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is equal, skipping...") - continue } - if item.Diff.Type == secret.SecretDiffTypeAdded { - log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is new, creating...") - err := k8s.CreateSecret(item.LocalSecret, item.LocalSecret.Target) - if err != nil { - log.Error("Failed to create secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster") - return err - } - } else if item.Diff.Type == secret.SecretDiffTypeChanged { - log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is modified, updating...") - err := k8s.UpdateSecret(item.LocalSecret, item.LocalSecret.Target) - if err != nil { - log.Error("Failed to update secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster") - return err - } - } else if item.Diff.Type == secret.SecretDiffTypeRemoved { - log.Trace("Secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " is deleted, deleting...") - err := k8s.DeleteSecret(item.RemoteSecret, item.RemoteSecret.Target) - if err != nil { - log.Error("Failed to delete secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " in cluster") - return err + } + + if len(itemsToProcess) == 0 { + return nil + } + + // Create channels for parallel processing + itemChan := make(chan PlanItem, len(itemsToProcess)) + errorChan := make(chan error, len(itemsToProcess)) + + // Start worker goroutines + var workerGroup sync.WaitGroup + for i := 0; i < parallelism; i++ { + workerGroup.Add(1) + go func() { + defer workerGroup.Done() + for item := range itemChan { + var err error + if item.Diff.Type == secret.SecretDiffTypeAdded { + log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is new, creating...") + err = k8s.CreateSecret(item.LocalSecret, item.LocalSecret.Target) + if err != nil { + log.Error("Failed to create secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster") + } + } else if item.Diff.Type == secret.SecretDiffTypeChanged { + log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is modified, updating...") + err = k8s.UpdateSecret(item.LocalSecret, item.LocalSecret.Target) + if err != nil { + log.Error("Failed to update secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster") + } + } else if item.Diff.Type == secret.SecretDiffTypeRemoved { + log.Trace("Secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " is deleted, deleting...") + err = k8s.DeleteSecret(item.RemoteSecret, item.RemoteSecret.Target) + if err != nil { + log.Error("Failed to delete secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " in cluster") + } + } + if err != nil { + errorChan <- err + } } + }() + } + + // Send work to workers + go func() { + for _, item := range itemsToProcess { + itemChan <- item + } + close(itemChan) + }() + + // Wait for all workers to finish + go func() { + workerGroup.Wait() + close(errorChan) + }() + + // Check for errors + for err := range errorChan { + if err != nil { + return err } } + return nil } diff --git a/internal/secret/loader.go b/internal/secret/loader.go index 60974cf..c6edc4b 100644 --- a/internal/secret/loader.go +++ b/internal/secret/loader.go @@ -3,6 +3,7 @@ package secret import ( "errors" "strings" + "sync" "github.com/mxcd/gitops-cli/internal/util" "github.com/schollz/progressbar/v3" @@ -40,6 +41,11 @@ func LoadLocalSecretsLimited(targetTypeFilter SecretTargetType, directoryLimit s } secretFileNames = filteredFileNames + parallelism := util.GetCliContext().Int("parallelism") + if parallelism < 1 { + parallelism = 1 + } + secrets := []*Secret{} bar := progressbar.NewOptions(len(secretFileNames), progressbar.OptionEnableColorCodes(true), @@ -50,19 +56,58 @@ func LoadLocalSecretsLimited(targetTypeFilter SecretTargetType, directoryLimit s progressbar.OptionSetPredictTime(false), progressbar.OptionSetDescription("[green][Loading local secrets][reset]"), ) - for _, secretFileName := range secretFileNames { + + // Create channels for parallel processing + type secretResult struct { + secret *Secret + err error + } + + secretChan := make(chan string, len(secretFileNames)) + resultChan := make(chan secretResult, len(secretFileNames)) + + // Start worker goroutines + var workerGroup sync.WaitGroup + for i := 0; i < parallelism; i++ { + workerGroup.Add(1) + go func() { + defer workerGroup.Done() + for secretFileName := range secretChan { + secret, err := FromPath(secretFileName) + resultChan <- secretResult{secret: secret, err: err} + } + }() + } + + // Send work to workers + go func() { + for _, secretFileName := range secretFileNames { + secretChan <- secretFileName + } + close(secretChan) + }() + + // Wait for all workers to finish and close result channel + go func() { + workerGroup.Wait() + close(resultChan) + }() + + // Collect results + for result := range resultChan { bar.Add(1) - secret, err := FromPath(secretFileName) - if err != nil { + if result.err != nil { bar.Finish() - return nil, err + return nil, result.err } + + secret := result.secret if secret.TargetType != targetTypeFilter && targetTypeFilter != SecretTargetTypeAll { - log.Trace("Skipping file due to targetType filter: ", secretFileName) + log.Trace("Skipping file due to targetType filter: ", secret.Path) continue } if clusterLimit != "" && secret.Target != clusterLimit { - log.Trace("Skipping file due to target filter: ", secretFileName) + log.Trace("Skipping file due to target filter: ", secret.Path) continue } for _, s := range secrets {