Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/gitops/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ func main() {
Usage: "display unchanged secrets in the plan overview",
EnvVars: []string{"GITOPS_SHOW_UNCHANGED"},
},
&cli.IntFlag{
Name: "parallelism",
Aliases: []string{"p"},
Value: 5,
Usage: "number of parallel operations for decrypting secrets and kubernetes operations",
EnvVars: []string{"GITOPS_PARALLELISM"},
},
},
Commands: []*cli.Command{
{
Expand Down
103 changes: 75 additions & 28 deletions internal/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"fmt"
"strings"
"sync"

"github.com/TwiN/go-color"
"github.com/google/uuid"
Expand Down Expand Up @@ -122,6 +123,11 @@ func createKubernetesPlan(c *cli.Context) (*plan.Plan, error) {
Items: []plan.PlanItem{},
}

parallelism := c.Int("parallelism")
if parallelism < 1 {
parallelism = 1
}

bar := progressbar.NewOptions(len(localSecrets),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionShowBytes(false),
Expand All @@ -132,38 +138,79 @@ func createKubernetesPlan(c *cli.Context) (*plan.Plan, error) {
progressbar.OptionSetDescription("[green][Syncing local state with cluster][reset]"),
)

for _, localSecret := range localSecrets {
bar.Add(1)
// check for local secret in state
// update ID in localSecret if secret exists in state
// update hash in state if secret exists in state
stateSecret := state.GetState().GetByPath(localSecret.Path)
if stateSecret == nil {
log.Trace("Secret ", localSecret.CombinedName(), " does not exist in state")
localSecret.ID = uuid.New().String()
stateSecret = state.GetState().Add(localSecret)
} else {
log.Trace("Secret ", localSecret.CombinedName(), " exists in state. Updating")
stateSecret.Update(localSecret)
}

planItem := plan.PlanItem{
LocalSecret: localSecret,
}
// Create channels for parallel processing
type planItemResult struct {
item plan.PlanItem
err error
}

remoteSecret, err := k8s.GetSecret(localSecret, localSecret.Target)
if err != nil {
if k8sErrors.IsNotFound(err) {
log.Trace("Secret ", localSecret.Name, " does not exist in Kubernetes cluster")
} else {
log.Error("Failed to get secret ", localSecret.Name, " from Kubernetes cluster")
return nil, err
secretChan := make(chan *secret.Secret, len(localSecrets))
resultChan := make(chan planItemResult, len(localSecrets))

// Start worker goroutines
var workerGroup sync.WaitGroup
for i := 0; i < parallelism; i++ {
workerGroup.Add(1)
go func() {
defer workerGroup.Done()
for localSecret := range secretChan {
// check for local secret in state
// update ID in localSecret if secret exists in state
// update hash in state if secret exists in state
stateSecret := state.GetState().GetByPath(localSecret.Path)
if stateSecret == nil {
log.Trace("Secret ", localSecret.CombinedName(), " does not exist in state")
localSecret.ID = uuid.New().String()
stateSecret = state.GetState().Add(localSecret)
} else {
log.Trace("Secret ", localSecret.CombinedName(), " exists in state. Updating")
stateSecret.Update(localSecret)
}

planItem := plan.PlanItem{
LocalSecret: localSecret,
}

remoteSecret, err := k8s.GetSecret(localSecret, localSecret.Target)
if err != nil {
if k8sErrors.IsNotFound(err) {
log.Trace("Secret ", localSecret.Name, " does not exist in Kubernetes cluster")
} else {
log.Error("Failed to get secret ", localSecret.Name, " from Kubernetes cluster")
resultChan <- planItemResult{err: err}
return
}
}

planItem.RemoteSecret = remoteSecret
planItem.ComputeDiff()
resultChan <- planItemResult{item: planItem, err: nil}
}
}()
}

// Send work to workers
go func() {
for _, localSecret := range localSecrets {
secretChan <- localSecret
}
close(secretChan)
}()

planItem.RemoteSecret = remoteSecret
planItem.ComputeDiff()
p.AddItem(planItem)
// Wait for all workers to finish and close result channel
go func() {
workerGroup.Wait()
close(resultChan)
}()

// Collect results
for result := range resultChan {
bar.Add(1)
if result.err != nil {
bar.Finish()
return nil, result.err
}
p.AddItem(result.item)
}
bar.Finish()
println("")
Expand Down
97 changes: 75 additions & 22 deletions internal/plan/plan.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package plan

import (
"sync"

log "github.com/sirupsen/logrus"

"github.com/mxcd/gitops-cli/internal/k8s"
"github.com/mxcd/gitops-cli/internal/secret"
"github.com/mxcd/gitops-cli/internal/util"
)

type Plan struct {
Expand Down Expand Up @@ -64,33 +67,83 @@ func (p *Plan) Execute() error {
}

func executeKubernetesPlan(p *Plan) error {
parallelism := util.GetCliContext().Int("parallelism")
if parallelism < 1 {
parallelism = 1
}

// Filter items that need processing
itemsToProcess := []PlanItem{}
for _, item := range p.Items {
if item.Diff.Equal {
if !item.Diff.Equal {
itemsToProcess = append(itemsToProcess, item)
} else {
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is equal, skipping...")
continue
}
if item.Diff.Type == secret.SecretDiffTypeAdded {
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is new, creating...")
err := k8s.CreateSecret(item.LocalSecret, item.LocalSecret.Target)
if err != nil {
log.Error("Failed to create secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
return err
}
} else if item.Diff.Type == secret.SecretDiffTypeChanged {
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is modified, updating...")
err := k8s.UpdateSecret(item.LocalSecret, item.LocalSecret.Target)
if err != nil {
log.Error("Failed to update secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
return err
}
} else if item.Diff.Type == secret.SecretDiffTypeRemoved {
log.Trace("Secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " is deleted, deleting...")
err := k8s.DeleteSecret(item.RemoteSecret, item.RemoteSecret.Target)
if err != nil {
log.Error("Failed to delete secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " in cluster")
return err
}

if len(itemsToProcess) == 0 {
return nil
}

// Create channels for parallel processing
itemChan := make(chan PlanItem, len(itemsToProcess))
errorChan := make(chan error, len(itemsToProcess))

// Start worker goroutines
var workerGroup sync.WaitGroup
for i := 0; i < parallelism; i++ {
workerGroup.Add(1)
go func() {
defer workerGroup.Done()
for item := range itemChan {
var err error
if item.Diff.Type == secret.SecretDiffTypeAdded {
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is new, creating...")
err = k8s.CreateSecret(item.LocalSecret, item.LocalSecret.Target)
if err != nil {
log.Error("Failed to create secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
}
} else if item.Diff.Type == secret.SecretDiffTypeChanged {
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is modified, updating...")
err = k8s.UpdateSecret(item.LocalSecret, item.LocalSecret.Target)
if err != nil {
log.Error("Failed to update secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
}
} else if item.Diff.Type == secret.SecretDiffTypeRemoved {
log.Trace("Secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " is deleted, deleting...")
err = k8s.DeleteSecret(item.RemoteSecret, item.RemoteSecret.Target)
if err != nil {
log.Error("Failed to delete secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " in cluster")
}
}
if err != nil {
errorChan <- err
}
}
}()
}

// Send work to workers
go func() {
for _, item := range itemsToProcess {
itemChan <- item
}
close(itemChan)
}()

// Wait for all workers to finish
go func() {
workerGroup.Wait()
close(errorChan)
}()

// Check for errors
for err := range errorChan {
if err != nil {
return err
}
}

return nil
}
57 changes: 51 additions & 6 deletions internal/secret/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package secret
import (
"errors"
"strings"
"sync"

"github.com/mxcd/gitops-cli/internal/util"
"github.com/schollz/progressbar/v3"
Expand Down Expand Up @@ -40,6 +41,11 @@ func LoadLocalSecretsLimited(targetTypeFilter SecretTargetType, directoryLimit s
}
secretFileNames = filteredFileNames

parallelism := util.GetCliContext().Int("parallelism")
if parallelism < 1 {
parallelism = 1
}

secrets := []*Secret{}
bar := progressbar.NewOptions(len(secretFileNames),
progressbar.OptionEnableColorCodes(true),
Expand All @@ -50,19 +56,58 @@ func LoadLocalSecretsLimited(targetTypeFilter SecretTargetType, directoryLimit s
progressbar.OptionSetPredictTime(false),
progressbar.OptionSetDescription("[green][Loading local secrets][reset]"),
)
for _, secretFileName := range secretFileNames {

// Create channels for parallel processing
type secretResult struct {
secret *Secret
err error
}

secretChan := make(chan string, len(secretFileNames))
resultChan := make(chan secretResult, len(secretFileNames))

// Start worker goroutines
var workerGroup sync.WaitGroup
for i := 0; i < parallelism; i++ {
workerGroup.Add(1)
go func() {
defer workerGroup.Done()
for secretFileName := range secretChan {
secret, err := FromPath(secretFileName)
resultChan <- secretResult{secret: secret, err: err}
}
}()
}

// Send work to workers
go func() {
for _, secretFileName := range secretFileNames {
secretChan <- secretFileName
}
close(secretChan)
}()

// Wait for all workers to finish and close result channel
go func() {
workerGroup.Wait()
close(resultChan)
}()

// Collect results
for result := range resultChan {
bar.Add(1)
secret, err := FromPath(secretFileName)
if err != nil {
if result.err != nil {
bar.Finish()
return nil, err
return nil, result.err
}

secret := result.secret
if secret.TargetType != targetTypeFilter && targetTypeFilter != SecretTargetTypeAll {
log.Trace("Skipping file due to targetType filter: ", secretFileName)
log.Trace("Skipping file due to targetType filter: ", secret.Path)
continue
}
if clusterLimit != "" && secret.Target != clusterLimit {
log.Trace("Skipping file due to target filter: ", secretFileName)
log.Trace("Skipping file due to target filter: ", secret.Path)
continue
}
for _, s := range secrets {
Expand Down
Loading