From 1d205fe6c80be766339944162425f1ad26626d0d Mon Sep 17 00:00:00 2001 From: pa250194 Date: Tue, 19 Oct 2021 12:13:37 -0500 Subject: [PATCH 1/3] Refactor Bucket Controller - Added Bucket Provider Interface Signed-off-by: pa250194 --- controllers/bucket_controller.go | 276 +++++++++---------------------- go.mod | 2 +- pkg/gcp/gcp.go | 82 +++++++-- pkg/gcp/gcp_test.go | 140 ++++++++++++++-- pkg/minio/minio.go | 128 ++++++++++++++ pkg/minio/minio_test.go | 269 ++++++++++++++++++++++++++++++ 6 files changed, 669 insertions(+), 228 deletions(-) create mode 100644 pkg/minio/minio.go create mode 100644 pkg/minio/minio_test.go diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index a25587d1a..028610d90 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -25,10 +25,6 @@ import ( "strings" "time" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/minio/minio-go/v7/pkg/s3utils" - "google.golang.org/api/option" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,9 +43,11 @@ import ( "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/source-controller/pkg/gcp" + "github.com/fluxcd/source-controller/pkg/minio" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/sourceignore" + "github.com/go-git/go-git/v5/plumbing/format/gitignore" ) // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete @@ -71,6 +69,15 @@ type BucketReconcilerOptions struct { MaxConcurrentReconciles int } +type BucketProvider interface { + BucketExists(context.Context, string) (bool, error) + ObjectExists(context.Context, string, string) (bool, error) + FGetObject(context.Context, string, string, string) error + ListObjects(context.Context, gitignore.Matcher, string, string) error + ObjectIsNotFound(error) bool + Close(context.Context) +} + func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{}) } @@ -177,10 +184,20 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{RequeueAfter: bucket.GetInterval().Duration}, nil } +func (r *BucketReconciler) getBucketSecret(ctx context.Context, bucket sourcev1.Bucket) (corev1.Secret, error) { + var secret corev1.Secret + secretName := types.NamespacedName{ + Namespace: bucket.GetNamespace(), + Name: bucket.Spec.SecretRef.Name, + } + if err := r.Get(ctx, secretName, &secret); err != nil { + return secret, err + } + return secret, nil +} + func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) { log := ctrl.LoggerFrom(ctx) - var err error - var sourceBucket sourcev1.Bucket tempDir, err := os.MkdirTemp("", bucket.Name) if err != nil { @@ -193,17 +210,19 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket } }() - if bucket.Spec.Provider == sourcev1.GoogleBucketProvider { - sourceBucket, err = r.reconcileWithGCP(ctx, bucket, tempDir) - if err != nil { - return sourceBucket, err - } - } else { - sourceBucket, err = r.reconcileWithMinio(ctx, bucket, tempDir) + var secret corev1.Secret + + if bucket.Spec.SecretRef != nil { + secret, err = r.getBucketSecret(ctx, bucket) if err != nil { - return sourceBucket, err + return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err) } } + + if bucketResponse, err := fetchBucketContents(ctx, bucket, secret, tempDir); err != nil { + return bucketResponse, err + } + revision, err := r.checksum(tempDir) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err @@ -251,108 +270,37 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil } -func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { - if err := r.gc(bucket); err != nil { - r.event(ctx, bucket, events.EventSeverityError, - fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) - // Return the error so we retry the failed garbage collection - return ctrl.Result{}, err - } - - // Record deleted status - r.recordReadiness(ctx, bucket) - - // Remove our finalizer from the list and update it - controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) - if err := r.Update(ctx, &bucket); err != nil { - return ctrl.Result{}, err - } - - // Stop reconciliation as the object is being deleted - return ctrl.Result{}, nil -} - -// reconcileWithGCP handles getting objects from a Google Cloud Platform bucket -// using a gcp client -func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { - log := ctrl.LoggerFrom(ctx) - gcpClient, err := r.authGCP(ctx, bucket) - if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err - } - defer gcpClient.Close(log) - - ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) - defer cancel() - - exists, err := gcpClient.BucketExists(ctxTimeout, bucket.Spec.BucketName) - if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - if !exists { - err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - - // Look for file with ignore rules first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - } - ps, err := sourceignore.ReadIgnoreFile(path, nil) - if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - // In-spec patterns take precedence - if bucket.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) - } - matcher := sourceignore.NewMatcher(ps) - objects := gcpClient.ListObjects(ctxTimeout, bucket.Spec.BucketName, nil) - // download bucket content - for { - object, err := objects.Next() - if err == gcp.IteratorDone { - break - } +// fetchBucketContents selects a bucket provider that implement the bucket provider interface based on +// on the specified provider in the bucket spec. +func fetchBucketContents(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) { + switch bucket.Spec.Provider { + case sourcev1.GoogleBucketProvider: + gcpClient, err := gcp.NewClient(ctx, secret, bucket) if err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + err = fmt.Errorf("auth error: %w", err) + return sourcev1.Bucket{}, err } - - if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile { - continue + defer gcpClient.Close(ctx) + if bucketResponse, err := fetchFiles(ctx, gcpClient, bucket, tempDir); err != nil { + return bucketResponse, err } - - if matcher.Match(strings.Split(object.Name, "/"), false) { - continue + default: + minioClient, err := minio.NewClient(ctx, secret, bucket) + if err != nil { + err = fmt.Errorf("auth error: %w", err) + return sourcev1.Bucket{}, err } - - localPath := filepath.Join(tempDir, object.Name) - if err = gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Name, localPath); err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + if bucketResponse, err := fetchFiles(ctx, minioClient, bucket, tempDir); err != nil { + return bucketResponse, err } } return sourcev1.Bucket{}, nil } -// reconcileWithMinio handles getting objects from an S3 compatible bucket -// using a minio client -func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { - s3Client, err := r.authMinio(ctx, bucket) - if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err - } - +func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) defer cancel() - - exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName) + exists, err := client.BucketExists(ctxTimeout, bucket.Spec.BucketName) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } @@ -361,12 +309,10 @@ func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket source return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } - // Look for file with ignore rules first - // NB: S3 has flat filepath keys making it impossible to look - // for files in "subdirectories" without building up a tree first. + // Look for file with ignore rules first. path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { - if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" { + if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { + if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } } @@ -379,107 +325,33 @@ func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket source ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) } matcher := sourceignore.NewMatcher(ps) - - // download bucket content - for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{ - Recursive: true, - UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()), - }) { - if object.Err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - - if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { - continue - } - - if matcher.Match(strings.Split(object.Key, "/"), false) { - continue - } - - localPath := filepath.Join(tempDir, object.Key) - err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{}) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } + err = client.ListObjects(ctxTimeout, matcher, bucket.Spec.BucketName, tempDir) + if err != nil { + err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } return sourcev1.Bucket{}, nil } -// authGCP creates a new Google Cloud Platform storage client -// to interact with the storage service. -func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket) (*gcp.GCPClient, error) { - var client *gcp.GCPClient - var err error - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } - - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } - if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil { - return nil, err - } - client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) - if err != nil { - return nil, err - } - } else { - client, err = gcp.NewClient(ctx) - if err != nil { - return nil, err - } - } - return client, nil - -} - -// authMinio creates a new Minio client to interact with S3 -// compatible storage services. -func (r *BucketReconciler) authMinio(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) { - opt := minio.Options{ - Region: bucket.Spec.Region, - Secure: !bucket.Spec.Insecure, +func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { + if err := r.gc(bucket); err != nil { + r.event(ctx, bucket, events.EventSeverityError, + fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) + // Return the error so we retry the failed garbage collection + return ctrl.Result{}, err } - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } - - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } - - accesskey := "" - secretkey := "" - if k, ok := secret.Data["accesskey"]; ok { - accesskey = string(k) - } - if k, ok := secret.Data["secretkey"]; ok { - secretkey = string(k) - } - if accesskey == "" || secretkey == "" { - return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) - } - opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "") - } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { - opt.Creds = credentials.NewIAM("") - } + // Record deleted status + r.recordReadiness(ctx, bucket) - if opt.Creds == nil { - return nil, fmt.Errorf("no bucket credentials found") + // Remove our finalizer from the list and update it + controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) + if err := r.Update(ctx, &bucket); err != nil { + return ctrl.Result{}, err } - return minio.New(bucket.Spec.Endpoint, &opt) + // Stop reconciliation as the object is being deleted + return ctrl.Result{}, nil } // checksum calculates the SHA1 checksum of the given root directory. diff --git a/go.mod b/go.mod index 06f6f37d0..96eba7075 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/go-git/go-billy/v5 v5.3.1 github.com/go-git/go-git/v5 v5.4.2 github.com/go-logr/logr v1.2.2 + github.com/google/uuid v1.2.0 github.com/libgit2/git2go/v31 v31.7.6 github.com/minio/minio-go/v7 v7.0.15 github.com/onsi/ginkgo v1.16.5 @@ -94,7 +95,6 @@ require ( github.com/google/go-cmp v0.5.6 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.1.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/gorilla/mux v1.8.0 // indirect diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index 9127fcde3..7daa6af3d 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -13,7 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - package gcp import ( @@ -23,11 +22,18 @@ import ( "io" "os" "path/filepath" + "strings" gcpstorage "cloud.google.com/go/storage" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/fluxcd/source-controller/pkg/sourceignore" + "github.com/go-git/go-git/v5/plumbing/format/gitignore" "github.com/go-logr/logr" + + "golang.org/x/sync/errgroup" "google.golang.org/api/iterator" "google.golang.org/api/option" + corev1 "k8s.io/api/core/v1" ) var ( @@ -50,13 +56,26 @@ type GCPClient struct { // NewClient creates a new GCP storage client. The Client will automatically look for the Google Application // Credential environment variable or look for the Google Application Credential file. -func NewClient(ctx context.Context, opts ...option.ClientOption) (*GCPClient, error) { - client, err := gcpstorage.NewClient(ctx, opts...) - if err != nil { - return nil, err +func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket) (*GCPClient, error) { + gcpclient := &GCPClient{} + if bucket.Spec.SecretRef != nil { + if err := ValidateSecret(secret.Data, secret.Name); err != nil { + return nil, err + } + client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) + if err != nil { + return nil, err + } + gcpclient.Client = client + } else { + client, err := gcpstorage.NewClient(ctx) + if err != nil { + return nil, err + } + gcpclient.Client = client } - return &GCPClient{Client: client}, nil + return gcpclient, nil } // ValidateSecret validates the credential secrets @@ -158,15 +177,54 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca // ListObjects lists the objects/contents of the bucket whose bucket name is provided. // the objects are returned as an Objectiterator and .Next() has to be called on them -// to loop through the Objects. -func (c *GCPClient) ListObjects(ctx context.Context, bucketName string, query *gcpstorage.Query) *gcpstorage.ObjectIterator { - items := c.Client.Bucket(bucketName).Objects(ctx, query) - return items +// to loop through the Objects. The Object are downloaded using a goroutine. +func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { + log := logr.FromContextOrDiscard(ctx) + items := c.Client.Bucket(bucketName).Objects(ctx, nil) + g, ctxx := errgroup.WithContext(ctx) + for { + object, err := items.Next() + if err == IteratorDone { + break + } + if err != nil { + err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err) + return err + } + if !(strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(object.Name, "/"), false)) { + g.Go(func() error { + if err := DownloadObject(ctxx, c, object, matcher, bucketName, tempDir); err != nil { + log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName)) + return err + } + return nil + }) + } + } + if err := g.Wait(); err != nil { + return err + } + return nil } // Close closes the GCP Client and logs any useful errors -func (c *GCPClient) Close(log logr.Logger) { +func (c *GCPClient) Close(ctx context.Context) { + log := logr.FromContextOrDiscard(ctx) if err := c.Client.Close(); err != nil { - log.Error(err, "GCP Provider") + log.Error(err, "closing GCP client") + } +} + +// ObjectIsNotFound checks if the error provided is ErrorObjectDoesNotExist(object does not exist) +func (c *GCPClient) ObjectIsNotFound(err error) bool { + return errors.Is(err, ErrorObjectDoesNotExist) +} + +// DownloadObject gets an object and downloads the object locally. +func DownloadObject(ctx context.Context, cl *GCPClient, obj *gcpstorage.ObjectAttrs, matcher gitignore.Matcher, bucketName, tempDir string) error { + localPath := filepath.Join(tempDir, obj.Name) + if err := cl.FGetObject(ctx, bucketName, obj.Name, localPath); err != nil { + return err } + return nil } diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index 7f431a44d..93497b03a 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -32,10 +32,15 @@ import ( "time" gcpstorage "cloud.google.com/go/storage" + "github.com/fluxcd/pkg/apis/meta" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/gcp" + "github.com/fluxcd/source-controller/pkg/sourceignore" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "google.golang.org/api/option" ) @@ -43,6 +48,7 @@ import ( const ( bucketName string = "test-bucket" objectName string = "test.yaml" + region = "us-east-1" ) var ( @@ -50,6 +56,55 @@ var ( client *gcpstorage.Client close func() err error + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "serviceaccount": []byte("ewogICAgInR5cGUiOiAic2VydmljZV9hY2NvdW50IiwKICAgICJwcm9qZWN0X2lkIjogInBvZGluZm8iLAogICAgInByaXZhdGVfa2V5X2lkIjogIjI4cXdnaDNnZGY1aGozZ2I1ZmozZ3N1NXlmZ2gzNGY0NTMyNDU2OGh5MiIsCiAgICAicHJpdmF0ZV9rZXkiOiAiLS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tXG5Id2V0aGd5MTIzaHVnZ2hoaGJkY3U2MzU2ZGd5amhzdmd2R0ZESFlnY2RqYnZjZGhic3g2M2Ncbjc2dGd5Y2ZlaHVoVkdURllmdzZ0N3lkZ3lWZ3lkaGV5aHVnZ3ljdWhland5NnQzNWZ0aHl1aGVndmNldGZcblRGVUhHVHlnZ2h1Ymh4ZTY1eWd0NnRneWVkZ3kzMjZodWN5dnN1aGJoY3Zjc2poY3NqaGNzdmdkdEhGQ0dpXG5IY3llNnR5eWczZ2Z5dWhjaGNzYmh5Z2NpamRiaHl5VEY2NnR1aGNldnVoZGNiaHVoaHZmdGN1aGJoM3VoN3Q2eVxuZ2d2ZnRVSGJoNnQ1cmZ0aGh1R1ZSdGZqaGJmY3JkNXI2N3l1aHV2Z0ZUWWpndnRmeWdoYmZjZHJoeWpoYmZjdGZkZnlodmZnXG50Z3ZnZ3RmeWdodmZ0NnR1Z3ZURjVyNjZ0dWpoZ3ZmcnR5aGhnZmN0Nnk3eXRmcjVjdHZnaGJoaHZ0Z2hoanZjdHRmeWNmXG5mZnhmZ2hqYnZnY2d5dDY3dWpiZ3ZjdGZ5aFZDN3VodmdjeWp2aGhqdnl1amNcbmNnZ2hndmdjZmhnZzc2NTQ1NHRjZnRoaGdmdHloaHZ2eXZ2ZmZnZnJ5eXU3N3JlcmVkc3dmdGhoZ2ZjZnR5Y2ZkcnR0ZmhmL1xuLS0tLS1FTkQgUFJJVkFURSBLRVktLS0tLVxuIiwKICAgICJjbGllbnRfZW1haWwiOiAidGVzdEBwb2RpbmZvLmlhbS5nc2VydmljZWFjY291bnQuY29tIiwKICAgICJjbGllbnRfaWQiOiAiMzI2NTc2MzQ2Nzg3NjI1MzY3NDYiLAogICAgImF1dGhfdXJpIjogImh0dHBzOi8vYWNjb3VudHMuZ29vZ2xlLmNvbS9vL29hdXRoMi9hdXRoIiwKICAgICJ0b2tlbl91cmkiOiAiaHR0cHM6Ly9vYXV0aDIuZ29vZ2xlYXBpcy5jb20vdG9rZW4iLAogICAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICAgImNsaWVudF94NTA5X2NlcnRfdXJsIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL3JvYm90L3YxL21ldGFkYXRhL3g1MDkvdGVzdCU0MHBvZGluZm8uaWFtLmdzZXJ2aWNlYWNjb3VudC5jb20iCn0="), + }, + Type: "Opaque", + } + badSecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "username": []byte("test-user"), + }, + Type: "Opaque", + } + bucket = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "storage.googleapis.com", + Region: region, + Provider: "gcp", + Insecure: true, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + bucketNoSecretRef = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "storage.googleapis.com", + Region: region, + Provider: "gcp", + Insecure: true, + }, + } ) func TestMain(m *testing.M) { @@ -109,10 +164,11 @@ func TestMain(m *testing.M) { os.Exit(run) } -func TestNewClient(t *testing.T) { - gcpClient, err := gcp.NewClient(context.Background(), option.WithHTTPClient(hc)) - assert.NilError(t, err) - assert.Assert(t, gcpClient != nil) +func TestNewClientWithSecretErr(t *testing.T) { + gcpClient, err := gcp.NewClient(context.Background(), secret, bucket) + t.Log(err) + assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value") + assert.Assert(t, gcpClient == nil) } func TestBucketExists(t *testing.T) { @@ -160,15 +216,33 @@ func TestListObjects(t *testing.T) { gcpClient := &gcp.GCPClient{ Client: client, } - objectIterator := gcpClient.ListObjects(context.Background(), bucketName, nil) - for { - _, err := objectIterator.Next() - if err == gcp.IteratorDone { - break - } - assert.NilError(t, err) + tempDir, err := os.MkdirTemp("", bucketName) + defer os.RemoveAll(tempDir) + assert.NilError(t, err) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + + err = gcpClient.ListObjects(context.Background(), matcher, bucketName, tempDir) + assert.NilError(t, err) +} + +func TestListObjectsErr(t *testing.T) { + gcpClient := &gcp.GCPClient{ + Client: client, } - assert.Assert(t, objectIterator != nil) + badBucketName := "bad-bucket" + tempDir, err := os.MkdirTemp("", badBucketName) + defer os.RemoveAll(tempDir) + assert.NilError(t, err) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + + err = gcpClient.ListObjects(context.Background(), matcher, badBucketName, tempDir) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName)) } func TestFGetObject(t *testing.T) { @@ -202,8 +276,8 @@ func TestFGetObjectNotExists(t *testing.T) { func TestFGetObjectDirectoryIsFileName(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) - defer os.RemoveAll(tempDir) assert.NilError(t, err) + defer os.RemoveAll(tempDir) gcpClient := &gcp.GCPClient{ Client: client, } @@ -213,6 +287,46 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) { } } +func TestDownloadObject(t *testing.T) { + gcpClient := &gcp.GCPClient{ + Client: client, + } + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ + Bucket: bucketName, + Name: objectName, + ContentType: "text/x-yaml", + Size: 1 << 20, + }, matcher, bucketName, tempDir) + assert.NilError(t, err) +} + +func TestDownloadObjectErr(t *testing.T) { + gcpClient := &gcp.GCPClient{ + Client: client, + } + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ + Bucket: bucketName, + Name: "test1.yaml", + ContentType: "text/x-yaml", + Size: 1 << 20, + }, matcher, bucketName, tempDir) + assert.Error(t, err, "storage: object doesn't exist") +} + func TestValidateSecret(t *testing.T) { t.Parallel() testCases := []struct { diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go new file mode 100644 index 000000000..64b11cce4 --- /dev/null +++ b/pkg/minio/minio.go @@ -0,0 +1,128 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "fmt" + "path/filepath" + "strings" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/fluxcd/source-controller/pkg/sourceignore" + "github.com/go-git/go-git/v5/plumbing/format/gitignore" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/s3utils" + corev1 "k8s.io/api/core/v1" +) + +type MinioClient struct { + // client for interacting with S3 compatible + // Storage APIs. + *minio.Client +} + +// NewClient creates a new Minio storage client. +func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket) (*MinioClient, error) { + opt := minio.Options{ + Region: bucket.Spec.Region, + Secure: !bucket.Spec.Insecure, + } + + if bucket.Spec.SecretRef != nil { + accesskey := "" + secretkey := "" + if k, ok := secret.Data["accesskey"]; ok { + accesskey = string(k) + } + if k, ok := secret.Data["secretkey"]; ok { + secretkey = string(k) + } + if accesskey == "" || secretkey == "" { + return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) + } + opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "") + } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { + opt.Creds = credentials.NewIAM("") + } + + if opt.Creds == nil { + return nil, fmt.Errorf("no bucket credentials found") + } + + client, err := minio.New(bucket.Spec.Endpoint, &opt) + if err != nil { + return nil, err + } + + return &MinioClient{Client: client}, nil +} + +// ObjectExists checks if the object with the provided name exists. +func (c *MinioClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) { + _, err := c.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{}) + if err != nil { + return false, err + } + return true, nil +} + +// FGetObject gets the object from the bucket and downloads the object locally. +func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) error { + return c.Client.FGetObject(ctx, bucketName, objectName, localPath, minio.GetObjectOptions{}) +} + +// ListObjects lists all the objects in a bucket and downloads the objects. +func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { + for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ + Recursive: true, + UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()), + }) { + if object.Err != nil { + err := fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, object.Err) + return err + } + + if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { + continue + } + + if matcher.Match(strings.Split(object.Key, "/"), false) { + continue + } + + localPath := filepath.Join(tempDir, object.Key) + err := c.FGetObject(ctx, bucketName, object.Key, localPath) + if err != nil { + err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucketName, err) + return err + } + } + return nil +} + +// Close closes the Minio Client and logs any useful errors +func (c *MinioClient) Close(ctx context.Context) { + //minio client does not provide a close method +} + +// ObjectIsNotFound checks if the error provided is NoSuchKey(object does not exist) +func (c *MinioClient) ObjectIsNotFound(err error) bool { + resp, ok := err.(minio.ErrorResponse) + return ok && resp.Code != "NoSuchKey" +} diff --git a/pkg/minio/minio_test.go b/pkg/minio/minio_test.go new file mode 100644 index 000000000..ae2c2f657 --- /dev/null +++ b/pkg/minio/minio_test.go @@ -0,0 +1,269 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package minio_test + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/fluxcd/pkg/apis/meta" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/fluxcd/source-controller/pkg/minio" + "github.com/fluxcd/source-controller/pkg/sourceignore" + + "github.com/google/uuid" + miniov7 "github.com/minio/minio-go/v7" + "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + objectName string = "test.yaml" + region string = "us-east-1" +) + +var ( + minioclient *minio.MinioClient + bucketName = "test-bucket-minio" + uuid.New().String() + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "accesskey": []byte("Q3AM3UQ867SPQQA43P2F"), + "secretkey": []byte("zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"), + }, + Type: "Opaque", + } + bucket = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "generic", + Insecure: true, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + emptySecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{}, + Type: "Opaque", + } + bucketNoSecretRef = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "generic", + Insecure: true, + }, + } + bucketAwsProvider = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "aws", + Insecure: true, + }, + } +) + +func TestMain(m *testing.M) { + var err error + ctx := context.Background() + minioclient, err = minio.NewClient(ctx, secret, bucket) + if err != nil { + log.Fatal(err) + } + createBucket(ctx) + addObjectToBucket(ctx) + run := m.Run() + removeObjectFromBucket(ctx) + deleteBucket(ctx) + os.Exit(run) +} + +func TestNewClient(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, secret, bucket) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestNewClientEmptySecret(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, emptySecret, bucket) + assert.Error(t, err, fmt.Sprintf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", emptySecret.Name)) + assert.Assert(t, minioClient == nil) +} + +func TestNewClientNoSecretRef(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, corev1.Secret{}, bucketNoSecretRef) + assert.Error(t, err, "no bucket credentials found") + assert.Assert(t, minioClient == nil) +} + +func TestNewClientAwsProvider(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, corev1.Secret{}, bucketAwsProvider) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestBucketExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.BucketExists(ctx, bucketName) + assert.NilError(t, err) + assert.Assert(t, exists) +} + +func TestBucketNotExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.BucketExists(ctx, "notexistsbucket") + assert.NilError(t, err) + assert.Assert(t, !exists) +} + +func TestObjectExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.ObjectExists(ctx, bucketName, objectName) + assert.NilError(t, err) + assert.Assert(t, exists) +} + +func TestObjectNotExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.ObjectExists(ctx, bucketName, "notexists.yaml") + assert.Error(t, err, "The specified key does not exist.") + assert.Assert(t, !exists) +} + +func TestFGetObject(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + err = minioclient.FGetObject(ctx, bucketName, objectName, path) + assert.NilError(t, err) +} + +func TestListObjects(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = minioclient.ListObjects(ctx, matcher, bucketName, tempDir) + assert.NilError(t, err) +} + +func TestListObjectsErr(t *testing.T) { + ctx := context.Background() + badBucketName := "bad-bucket" + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + ps, err := sourceignore.ReadIgnoreFile(path, nil) + assert.NilError(t, err) + matcher := sourceignore.NewMatcher(ps) + err = minioclient.ListObjects(ctx, matcher, badBucketName, tempDir) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName)) +} + +func createBucket(ctx context.Context) { + if err := minioclient.Client.MakeBucket(ctx, bucketName, miniov7.MakeBucketOptions{Region: region}); err != nil { + exists, errBucketExists := minioclient.BucketExists(ctx, bucketName) + if errBucketExists == nil && exists { + deleteBucket(ctx) + } else { + log.Fatalln(err) + } + } +} + +func deleteBucket(ctx context.Context) { + if err := minioclient.Client.RemoveBucket(ctx, bucketName); err != nil { + log.Println(err) + } +} + +func addObjectToBucket(ctx context.Context) { + fileReader := strings.NewReader(getObjectFile()) + fileSize := fileReader.Size() + _, err := minioclient.Client.PutObject(ctx, bucketName, objectName, fileReader, fileSize, miniov7.PutObjectOptions{ + ContentType: "text/x-yaml", + }) + if err != nil { + log.Println(err) + } +} + +func removeObjectFromBucket(ctx context.Context) { + if err := minioclient.Client.RemoveObject(ctx, bucketName, objectName, miniov7.RemoveObjectOptions{ + GovernanceBypass: true, + }); err != nil { + log.Println(err) + } +} + +func getObjectFile() string { + return ` + apiVersion: source.toolkit.fluxcd.io/v1beta1 + kind: Bucket + metadata: + name: podinfo + namespace: default + spec: + interval: 5m + provider: aws + bucketName: podinfo + endpoint: s3.amazonaws.com + region: us-east-1 + timeout: 30s + ` +} From 0104b2726fafae1c8bd6933eed6098c8a993edd6 Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Tue, 2 Nov 2021 17:33:38 +0000 Subject: [PATCH 2/3] Factor out fetching objects The algorithm for conditionally downloading object files is the same, whether you are using GCP storage or an S3/Minio-compatible bucket. The only thing that differs is how the respective clients handle enumerating through the objects in the bucket; by implementing just that in each provider, I can have the select-and-fetch code in once place. This deliberately omits the parallelised fetching that the GCP client had, for the sake of lining the clients up. It can be reintroduced (in the factored out code) later. Signed-off-by: Michael Bridgen --- controllers/bucket_controller.go | 34 +++++++++++---- pkg/gcp/gcp.go | 31 ++------------ pkg/gcp/gcp_test.go | 71 +++++--------------------------- pkg/minio/minio.go | 19 +-------- pkg/minio/minio_test.go | 29 +++++-------- 5 files changed, 52 insertions(+), 132 deletions(-) diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 028610d90..27ac62e63 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -47,7 +47,6 @@ import ( sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/sourceignore" - "github.com/go-git/go-git/v5/plumbing/format/gitignore" ) // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete @@ -73,8 +72,8 @@ type BucketProvider interface { BucketExists(context.Context, string) (bool, error) ObjectExists(context.Context, string, string) (bool, error) FGetObject(context.Context, string, string, string) error - ListObjects(context.Context, gitignore.Matcher, string, string) error ObjectIsNotFound(error) bool + VisitObjects(context.Context, string, func(string) error) error Close(context.Context) } @@ -310,13 +309,13 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck } // Look for file with ignore rules first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { + ignorefile := filepath.Join(tempDir, sourceignore.IgnoreFile) + if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, ignorefile); err != nil { + if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { // FIXME? return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } } - ps, err := sourceignore.ReadIgnoreFile(path, nil) + ps, err := sourceignore.ReadIgnoreFile(ignorefile, nil) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } @@ -325,12 +324,29 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) } matcher := sourceignore.NewMatcher(ps) - err = client.ListObjects(ctxTimeout, matcher, bucket.Spec.BucketName, tempDir) + + err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error { + if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile { + return nil + } + + if matcher.Match(strings.Split(path, "/"), false) { + return nil + } + + localPath := filepath.Join(tempDir, path) + err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath) + if err != nil { + err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + return err + } + return nil + }) if err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } - return sourcev1.Bucket{}, nil + return bucket, nil } func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index 7daa6af3d..28a7abe14 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -22,15 +22,10 @@ import ( "io" "os" "path/filepath" - "strings" gcpstorage "cloud.google.com/go/storage" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" - "github.com/fluxcd/source-controller/pkg/sourceignore" - "github.com/go-git/go-git/v5/plumbing/format/gitignore" "github.com/go-logr/logr" - - "golang.org/x/sync/errgroup" "google.golang.org/api/iterator" "google.golang.org/api/option" corev1 "k8s.io/api/core/v1" @@ -178,10 +173,8 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca // ListObjects lists the objects/contents of the bucket whose bucket name is provided. // the objects are returned as an Objectiterator and .Next() has to be called on them // to loop through the Objects. The Object are downloaded using a goroutine. -func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { - log := logr.FromContextOrDiscard(ctx) +func (c *GCPClient) VisitObjects(ctx context.Context, bucketName string, visit func(string) error) error { items := c.Client.Bucket(bucketName).Objects(ctx, nil) - g, ctxx := errgroup.WithContext(ctx) for { object, err := items.Next() if err == IteratorDone { @@ -191,19 +184,10 @@ func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err) return err } - if !(strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(object.Name, "/"), false)) { - g.Go(func() error { - if err := DownloadObject(ctxx, c, object, matcher, bucketName, tempDir); err != nil { - log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName)) - return err - } - return nil - }) + if err = visit(object.Name); err != nil { + return err } } - if err := g.Wait(); err != nil { - return err - } return nil } @@ -219,12 +203,3 @@ func (c *GCPClient) Close(ctx context.Context) { func (c *GCPClient) ObjectIsNotFound(err error) bool { return errors.Is(err, ErrorObjectDoesNotExist) } - -// DownloadObject gets an object and downloads the object locally. -func DownloadObject(ctx context.Context, cl *GCPClient, obj *gcpstorage.ObjectAttrs, matcher gitignore.Matcher, bucketName, tempDir string) error { - localPath := filepath.Join(tempDir, obj.Name) - if err := cl.FGetObject(ctx, bucketName, obj.Name, localPath); err != nil { - return err - } - return nil -} diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index 93497b03a..f2e4235a4 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -35,7 +35,6 @@ import ( "github.com/fluxcd/pkg/apis/meta" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/gcp" - "github.com/fluxcd/source-controller/pkg/sourceignore" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" "gotest.tools/assert" @@ -212,36 +211,28 @@ func TestObjectNotExists(t *testing.T) { assert.Assert(t, !exists) } -func TestListObjects(t *testing.T) { +func TestVisitObjects(t *testing.T) { gcpClient := &gcp.GCPClient{ Client: client, } - tempDir, err := os.MkdirTemp("", bucketName) - defer os.RemoveAll(tempDir) - assert.NilError(t, err) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = gcpClient.ListObjects(context.Background(), matcher, bucketName, tempDir) + objs := []string{} + err := gcpClient.VisitObjects(context.Background(), bucketName, func(path string) error { + objs = append(objs, path) + return nil + }) assert.NilError(t, err) + assert.DeepEqual(t, objs, []string{}) } -func TestListObjectsErr(t *testing.T) { +func TestVisitObjectsErr(t *testing.T) { gcpClient := &gcp.GCPClient{ Client: client, } badBucketName := "bad-bucket" - tempDir, err := os.MkdirTemp("", badBucketName) - defer os.RemoveAll(tempDir) - assert.NilError(t, err) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - - err = gcpClient.ListObjects(context.Background(), matcher, badBucketName, tempDir) + err := gcpClient.VisitObjects(context.Background(), badBucketName, func(path string) error { + return nil + }) assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName)) } @@ -287,46 +278,6 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) { } } -func TestDownloadObject(t *testing.T) { - gcpClient := &gcp.GCPClient{ - Client: client, - } - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ - Bucket: bucketName, - Name: objectName, - ContentType: "text/x-yaml", - Size: 1 << 20, - }, matcher, bucketName, tempDir) - assert.NilError(t, err) -} - -func TestDownloadObjectErr(t *testing.T) { - gcpClient := &gcp.GCPClient{ - Client: client, - } - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ - Bucket: bucketName, - Name: "test1.yaml", - ContentType: "text/x-yaml", - Size: 1 << 20, - }, matcher, bucketName, tempDir) - assert.Error(t, err, "storage: object doesn't exist") -} - func TestValidateSecret(t *testing.T) { t.Parallel() testCases := []struct { diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go index 64b11cce4..df4ab64ed 100644 --- a/pkg/minio/minio.go +++ b/pkg/minio/minio.go @@ -19,12 +19,8 @@ package minio import ( "context" "fmt" - "path/filepath" - "strings" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" - "github.com/fluxcd/source-controller/pkg/sourceignore" - "github.com/go-git/go-git/v5/plumbing/format/gitignore" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/s3utils" @@ -88,7 +84,7 @@ func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, lo } // ListObjects lists all the objects in a bucket and downloads the objects. -func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { +func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(string) error) error { for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ Recursive: true, UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()), @@ -98,18 +94,7 @@ func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher return err } - if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { - continue - } - - if matcher.Match(strings.Split(object.Key, "/"), false) { - continue - } - - localPath := filepath.Join(tempDir, object.Key) - err := c.FGetObject(ctx, bucketName, object.Key, localPath) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucketName, err) + if err := visit(object.Key); err != nil { return err } } diff --git a/pkg/minio/minio_test.go b/pkg/minio/minio_test.go index ae2c2f657..aac36a019 100644 --- a/pkg/minio/minio_test.go +++ b/pkg/minio/minio_test.go @@ -188,30 +188,23 @@ func TestFGetObject(t *testing.T) { assert.NilError(t, err) } -func TestListObjects(t *testing.T) { +func TestVisitObjects(t *testing.T) { ctx := context.Background() - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = minioclient.ListObjects(ctx, matcher, bucketName, tempDir) + objs := []string{} + err := minioclient.VisitObjects(ctx, bucketName, func(path string) error { + objs = append(objs, path) + return nil + }) assert.NilError(t, err) + assert.DeepEqual(t, objs, []string{objectName}) } -func TestListObjectsErr(t *testing.T) { +func TestVisitObjectsErr(t *testing.T) { ctx := context.Background() badBucketName := "bad-bucket" - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = minioclient.ListObjects(ctx, matcher, badBucketName, tempDir) + err := minioclient.VisitObjects(ctx, badBucketName, func(string) error { + return nil + }) assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName)) } From 53c2a15b0a996eabfaecd3f475fddc8bd82118e5 Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Thu, 11 Nov 2021 17:09:34 +0000 Subject: [PATCH 3/3] Reinstate fetching bucket contents in parallel This commit reintroduces the use of goroutines for fetching objects, but in the caller of the client interface rather than in a particular client implementation. Signed-off-by: Michael Bridgen --- controllers/bucket_controller.go | 43 ++++++-- controllers/bucket_fetch_test.go | 162 +++++++++++++++++++++++++++++++ 2 files changed, 198 insertions(+), 7 deletions(-) create mode 100644 controllers/bucket_fetch_test.go diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 27ac62e63..0c12c62d9 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -49,6 +50,18 @@ import ( "github.com/fluxcd/source-controller/pkg/sourceignore" ) +// maxConcurrentFetches is the upper bound on the goroutines used to +// fetch bucket objects. It's important to have a bound, to avoid +// using arbitrary amounts of memory; the actual number is chosen +// according to the queueing rule of thumb with some conservative +// parameters: +// s > Nr / T +// N (number of requestors, i.e., objects to fetch) = 10000 +// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s) +// T (total time available) = 1s +// -> s > 100 +const maxConcurrentFetches = 100 + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete @@ -325,6 +338,14 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck } matcher := sourceignore.NewMatcher(ps) + // Download in parallel, but bound the concurrency. According to + // AWS and GCP docs, rate limits are either soft or don't exist: + // - https://cloud.google.com/storage/quotas + // - https://docs.aws.amazon.com/general/latest/gr/s3.html + // .. so, the limiting factor is this process keeping a small footprint. + semaphore := make(chan struct{}, maxConcurrentFetches) + group, ctx := errgroup.WithContext(ctx) + err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error { if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile { return nil @@ -334,15 +355,23 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck return nil } - localPath := filepath.Join(tempDir, path) - err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return err - } + // block until there's capacity + semaphore <- struct{}{} + group.Go(func() error { + defer func() { <-semaphore }() + localPath := filepath.Join(tempDir, path) + err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath) + if err != nil { + err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + return err + } + return nil + }) return nil }) - if err != nil { + + // VisitObjects won't return an error, but the errgroup might. + if err = group.Wait(); err != nil { err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } diff --git a/controllers/bucket_fetch_test.go b/controllers/bucket_fetch_test.go new file mode 100644 index 000000000..c7b50f3aa --- /dev/null +++ b/controllers/bucket_fetch_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" +) + +type mockBucketClient struct { + bucketName string + objects map[string]string +} + +var mockNotFound = fmt.Errorf("not found") + +func (m mockBucketClient) BucketExists(c context.Context, name string) (bool, error) { + return name == m.bucketName, nil +} + +func (m mockBucketClient) ObjectExists(c context.Context, bucket, obj string) (bool, error) { + if bucket != m.bucketName { + return false, fmt.Errorf("bucket does not exist") + } + _, ok := m.objects[obj] + return ok, nil +} + +func (m mockBucketClient) FGetObject(c context.Context, bucket, obj, path string) error { + if bucket != m.bucketName { + return fmt.Errorf("bucket does not exist") + } + // tiny bit of protocol, for convenience: if asked for an object "error", then return an error. + if obj == "error" { + return fmt.Errorf("I was asked to report an error") + } + object, ok := m.objects[obj] + if !ok { + return mockNotFound + } + return os.WriteFile(path, []byte(object), os.FileMode(0660)) +} + +func (m mockBucketClient) ObjectIsNotFound(e error) bool { + return e == mockNotFound +} + +func (m mockBucketClient) VisitObjects(c context.Context, bucket string, f func(string) error) error { + for path := range m.objects { + if err := f(path); err != nil { + return err + } + } + return nil +} + +func (m mockBucketClient) Close(c context.Context) { + return +} + +// Since the algorithm for fetching files uses concurrency and has some complications around error +// reporting, it's worth testing by itself. +func TestFetchFiles(t *testing.T) { + files := map[string]string{ + "foo.yaml": "foo: 1", + "bar.yaml": "bar: 2", + "baz.yaml": "baz: 3", + } + bucketName := "all-my-config" + + bucket := sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Timeout: &metav1.Duration{Duration: 1 * time.Hour}, + }, + } + client := mockBucketClient{ + objects: files, + bucketName: bucketName, + } + + t.Run("fetch files happy path", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + _, err = fetchFiles(context.TODO(), client, bucket, tmp) + if err != nil { + t.Fatal(err) + } + + for path := range files { + p := filepath.Join(tmp, path) + _, err := os.Stat(p) + if err != nil { + t.Error(err) + } + } + }) + + t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + files["error"] = "this one causes an error" + _, err = fetchFiles(context.TODO(), client, bucket, tmp) + if err == nil { + t.Fatal("expected error but got nil") + } + }) + + t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) { + // this will fail if, for example, the semaphore is not used correctly and blocks + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + lotsOfFiles := map[string]string{} + for i := 0; i < 2*maxConcurrentFetches; i++ { + f := fmt.Sprintf("file-%d", i) + lotsOfFiles[f] = f + } + lotsOfFilesClient := mockBucketClient{ + bucketName: bucketName, + objects: lotsOfFiles, + } + + _, err = fetchFiles(context.TODO(), lotsOfFilesClient, bucket, tmp) + if err != nil { + t.Fatal(err) + } + }) +}