diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index a25587d1a..0c12c62d9 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -25,10 +25,7 @@ import ( "strings" "time" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/minio/minio-go/v7/pkg/s3utils" - "google.golang.org/api/option" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,11 +44,24 @@ import ( "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/source-controller/pkg/gcp" + "github.com/fluxcd/source-controller/pkg/minio" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/sourceignore" ) +// maxConcurrentFetches is the upper bound on the goroutines used to +// fetch bucket objects. It's important to have a bound, to avoid +// using arbitrary amounts of memory; the actual number is chosen +// according to the queueing rule of thumb with some conservative +// parameters: +// s > Nr / T +// N (number of requestors, i.e., objects to fetch) = 10000 +// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s) +// T (total time available) = 1s +// -> s > 100 +const maxConcurrentFetches = 100 + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete @@ -71,6 +81,15 @@ type BucketReconcilerOptions struct { MaxConcurrentReconciles int } +type BucketProvider interface { + BucketExists(context.Context, string) (bool, error) + ObjectExists(context.Context, string, string) (bool, error) + FGetObject(context.Context, string, string, string) error + ObjectIsNotFound(error) bool + VisitObjects(context.Context, string, func(string) error) error + Close(context.Context) +} + func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{}) } @@ -177,10 +196,20 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{RequeueAfter: bucket.GetInterval().Duration}, nil } +func (r *BucketReconciler) getBucketSecret(ctx context.Context, bucket sourcev1.Bucket) (corev1.Secret, error) { + var secret corev1.Secret + secretName := types.NamespacedName{ + Namespace: bucket.GetNamespace(), + Name: bucket.Spec.SecretRef.Name, + } + if err := r.Get(ctx, secretName, &secret); err != nil { + return secret, err + } + return secret, nil +} + func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) { log := ctrl.LoggerFrom(ctx) - var err error - var sourceBucket sourcev1.Bucket tempDir, err := os.MkdirTemp("", bucket.Name) if err != nil { @@ -193,17 +222,19 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket } }() - if bucket.Spec.Provider == sourcev1.GoogleBucketProvider { - sourceBucket, err = r.reconcileWithGCP(ctx, bucket, tempDir) - if err != nil { - return sourceBucket, err - } - } else { - sourceBucket, err = r.reconcileWithMinio(ctx, bucket, tempDir) + var secret corev1.Secret + + if bucket.Spec.SecretRef != nil { + secret, err = r.getBucketSecret(ctx, bucket) if err != nil { - return sourceBucket, err + return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err) } } + + if bucketResponse, err := fetchBucketContents(ctx, bucket, secret, tempDir); err != nil { + return bucketResponse, err + } + revision, err := r.checksum(tempDir) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err @@ -251,108 +282,37 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil } -func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { - if err := r.gc(bucket); err != nil { - r.event(ctx, bucket, events.EventSeverityError, - fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) - // Return the error so we retry the failed garbage collection - return ctrl.Result{}, err - } - - // Record deleted status - r.recordReadiness(ctx, bucket) - - // Remove our finalizer from the list and update it - controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) - if err := r.Update(ctx, &bucket); err != nil { - return ctrl.Result{}, err - } - - // Stop reconciliation as the object is being deleted - return ctrl.Result{}, nil -} - -// reconcileWithGCP handles getting objects from a Google Cloud Platform bucket -// using a gcp client -func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { - log := ctrl.LoggerFrom(ctx) - gcpClient, err := r.authGCP(ctx, bucket) - if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err - } - defer gcpClient.Close(log) - - ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) - defer cancel() - - exists, err := gcpClient.BucketExists(ctxTimeout, bucket.Spec.BucketName) - if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - if !exists { - err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - - // Look for file with ignore rules first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - } - ps, err := sourceignore.ReadIgnoreFile(path, nil) - if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - // In-spec patterns take precedence - if bucket.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) - } - matcher := sourceignore.NewMatcher(ps) - objects := gcpClient.ListObjects(ctxTimeout, bucket.Spec.BucketName, nil) - // download bucket content - for { - object, err := objects.Next() - if err == gcp.IteratorDone { - break - } +// fetchBucketContents selects a bucket provider that implement the bucket provider interface based on +// on the specified provider in the bucket spec. +func fetchBucketContents(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) { + switch bucket.Spec.Provider { + case sourcev1.GoogleBucketProvider: + gcpClient, err := gcp.NewClient(ctx, secret, bucket) if err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + err = fmt.Errorf("auth error: %w", err) + return sourcev1.Bucket{}, err } - - if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile { - continue + defer gcpClient.Close(ctx) + if bucketResponse, err := fetchFiles(ctx, gcpClient, bucket, tempDir); err != nil { + return bucketResponse, err } - - if matcher.Match(strings.Split(object.Name, "/"), false) { - continue + default: + minioClient, err := minio.NewClient(ctx, secret, bucket) + if err != nil { + err = fmt.Errorf("auth error: %w", err) + return sourcev1.Bucket{}, err } - - localPath := filepath.Join(tempDir, object.Name) - if err = gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Name, localPath); err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + if bucketResponse, err := fetchFiles(ctx, minioClient, bucket, tempDir); err != nil { + return bucketResponse, err } } return sourcev1.Bucket{}, nil } -// reconcileWithMinio handles getting objects from an S3 compatible bucket -// using a minio client -func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { - s3Client, err := r.authMinio(ctx, bucket) - if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err - } - +func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) defer cancel() - - exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName) + exists, err := client.BucketExists(ctxTimeout, bucket.Spec.BucketName) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } @@ -361,16 +321,14 @@ func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket source return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } - // Look for file with ignore rules first - // NB: S3 has flat filepath keys making it impossible to look - // for files in "subdirectories" without building up a tree first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { - if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" { + // Look for file with ignore rules first. + ignorefile := filepath.Join(tempDir, sourceignore.IgnoreFile) + if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, ignorefile); err != nil { + if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { // FIXME? return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } } - ps, err := sourceignore.ReadIgnoreFile(path, nil) + ps, err := sourceignore.ReadIgnoreFile(ignorefile, nil) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } @@ -380,106 +338,65 @@ func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket source } matcher := sourceignore.NewMatcher(ps) - // download bucket content - for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{ - Recursive: true, - UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()), - }) { - if object.Err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } + // Download in parallel, but bound the concurrency. According to + // AWS and GCP docs, rate limits are either soft or don't exist: + // - https://cloud.google.com/storage/quotas + // - https://docs.aws.amazon.com/general/latest/gr/s3.html + // .. so, the limiting factor is this process keeping a small footprint. + semaphore := make(chan struct{}, maxConcurrentFetches) + group, ctx := errgroup.WithContext(ctx) - if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { - continue + err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error { + if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile { + return nil } - if matcher.Match(strings.Split(object.Key, "/"), false) { - continue + if matcher.Match(strings.Split(path, "/"), false) { + return nil } - localPath := filepath.Join(tempDir, object.Key) - err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{}) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - } - return sourcev1.Bucket{}, nil -} - -// authGCP creates a new Google Cloud Platform storage client -// to interact with the storage service. -func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket) (*gcp.GCPClient, error) { - var client *gcp.GCPClient - var err error - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } + // block until there's capacity + semaphore <- struct{}{} + group.Go(func() error { + defer func() { <-semaphore }() + localPath := filepath.Join(tempDir, path) + err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath) + if err != nil { + err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + return err + } + return nil + }) + return nil + }) - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } - if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil { - return nil, err - } - client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) - if err != nil { - return nil, err - } - } else { - client, err = gcp.NewClient(ctx) - if err != nil { - return nil, err - } + // VisitObjects won't return an error, but the errgroup might. + if err = group.Wait(); err != nil { + err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } - return client, nil - + return bucket, nil } -// authMinio creates a new Minio client to interact with S3 -// compatible storage services. -func (r *BucketReconciler) authMinio(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) { - opt := minio.Options{ - Region: bucket.Spec.Region, - Secure: !bucket.Spec.Insecure, +func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { + if err := r.gc(bucket); err != nil { + r.event(ctx, bucket, events.EventSeverityError, + fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) + // Return the error so we retry the failed garbage collection + return ctrl.Result{}, err } - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } - - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } - - accesskey := "" - secretkey := "" - if k, ok := secret.Data["accesskey"]; ok { - accesskey = string(k) - } - if k, ok := secret.Data["secretkey"]; ok { - secretkey = string(k) - } - if accesskey == "" || secretkey == "" { - return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) - } - opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "") - } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { - opt.Creds = credentials.NewIAM("") - } + // Record deleted status + r.recordReadiness(ctx, bucket) - if opt.Creds == nil { - return nil, fmt.Errorf("no bucket credentials found") + // Remove our finalizer from the list and update it + controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) + if err := r.Update(ctx, &bucket); err != nil { + return ctrl.Result{}, err } - return minio.New(bucket.Spec.Endpoint, &opt) + // Stop reconciliation as the object is being deleted + return ctrl.Result{}, nil } // checksum calculates the SHA1 checksum of the given root directory. diff --git a/controllers/bucket_fetch_test.go b/controllers/bucket_fetch_test.go new file mode 100644 index 000000000..c7b50f3aa --- /dev/null +++ b/controllers/bucket_fetch_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" +) + +type mockBucketClient struct { + bucketName string + objects map[string]string +} + +var mockNotFound = fmt.Errorf("not found") + +func (m mockBucketClient) BucketExists(c context.Context, name string) (bool, error) { + return name == m.bucketName, nil +} + +func (m mockBucketClient) ObjectExists(c context.Context, bucket, obj string) (bool, error) { + if bucket != m.bucketName { + return false, fmt.Errorf("bucket does not exist") + } + _, ok := m.objects[obj] + return ok, nil +} + +func (m mockBucketClient) FGetObject(c context.Context, bucket, obj, path string) error { + if bucket != m.bucketName { + return fmt.Errorf("bucket does not exist") + } + // tiny bit of protocol, for convenience: if asked for an object "error", then return an error. + if obj == "error" { + return fmt.Errorf("I was asked to report an error") + } + object, ok := m.objects[obj] + if !ok { + return mockNotFound + } + return os.WriteFile(path, []byte(object), os.FileMode(0660)) +} + +func (m mockBucketClient) ObjectIsNotFound(e error) bool { + return e == mockNotFound +} + +func (m mockBucketClient) VisitObjects(c context.Context, bucket string, f func(string) error) error { + for path := range m.objects { + if err := f(path); err != nil { + return err + } + } + return nil +} + +func (m mockBucketClient) Close(c context.Context) { + return +} + +// Since the algorithm for fetching files uses concurrency and has some complications around error +// reporting, it's worth testing by itself. +func TestFetchFiles(t *testing.T) { + files := map[string]string{ + "foo.yaml": "foo: 1", + "bar.yaml": "bar: 2", + "baz.yaml": "baz: 3", + } + bucketName := "all-my-config" + + bucket := sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Timeout: &metav1.Duration{Duration: 1 * time.Hour}, + }, + } + client := mockBucketClient{ + objects: files, + bucketName: bucketName, + } + + t.Run("fetch files happy path", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + _, err = fetchFiles(context.TODO(), client, bucket, tmp) + if err != nil { + t.Fatal(err) + } + + for path := range files { + p := filepath.Join(tmp, path) + _, err := os.Stat(p) + if err != nil { + t.Error(err) + } + } + }) + + t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + files["error"] = "this one causes an error" + _, err = fetchFiles(context.TODO(), client, bucket, tmp) + if err == nil { + t.Fatal("expected error but got nil") + } + }) + + t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) { + // this will fail if, for example, the semaphore is not used correctly and blocks + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + lotsOfFiles := map[string]string{} + for i := 0; i < 2*maxConcurrentFetches; i++ { + f := fmt.Sprintf("file-%d", i) + lotsOfFiles[f] = f + } + lotsOfFilesClient := mockBucketClient{ + bucketName: bucketName, + objects: lotsOfFiles, + } + + _, err = fetchFiles(context.TODO(), lotsOfFilesClient, bucket, tmp) + if err != nil { + t.Fatal(err) + } + }) +} diff --git a/go.mod b/go.mod index 06f6f37d0..96eba7075 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/go-git/go-billy/v5 v5.3.1 github.com/go-git/go-git/v5 v5.4.2 github.com/go-logr/logr v1.2.2 + github.com/google/uuid v1.2.0 github.com/libgit2/git2go/v31 v31.7.6 github.com/minio/minio-go/v7 v7.0.15 github.com/onsi/ginkgo v1.16.5 @@ -94,7 +95,6 @@ require ( github.com/google/go-cmp v0.5.6 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.1.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/gorilla/mux v1.8.0 // indirect diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index 9127fcde3..28a7abe14 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -13,7 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - package gcp import ( @@ -25,9 +24,11 @@ import ( "path/filepath" gcpstorage "cloud.google.com/go/storage" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/go-logr/logr" "google.golang.org/api/iterator" "google.golang.org/api/option" + corev1 "k8s.io/api/core/v1" ) var ( @@ -50,13 +51,26 @@ type GCPClient struct { // NewClient creates a new GCP storage client. The Client will automatically look for the Google Application // Credential environment variable or look for the Google Application Credential file. -func NewClient(ctx context.Context, opts ...option.ClientOption) (*GCPClient, error) { - client, err := gcpstorage.NewClient(ctx, opts...) - if err != nil { - return nil, err +func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket) (*GCPClient, error) { + gcpclient := &GCPClient{} + if bucket.Spec.SecretRef != nil { + if err := ValidateSecret(secret.Data, secret.Name); err != nil { + return nil, err + } + client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) + if err != nil { + return nil, err + } + gcpclient.Client = client + } else { + client, err := gcpstorage.NewClient(ctx) + if err != nil { + return nil, err + } + gcpclient.Client = client } - return &GCPClient{Client: client}, nil + return gcpclient, nil } // ValidateSecret validates the credential secrets @@ -158,15 +172,34 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca // ListObjects lists the objects/contents of the bucket whose bucket name is provided. // the objects are returned as an Objectiterator and .Next() has to be called on them -// to loop through the Objects. -func (c *GCPClient) ListObjects(ctx context.Context, bucketName string, query *gcpstorage.Query) *gcpstorage.ObjectIterator { - items := c.Client.Bucket(bucketName).Objects(ctx, query) - return items +// to loop through the Objects. The Object are downloaded using a goroutine. +func (c *GCPClient) VisitObjects(ctx context.Context, bucketName string, visit func(string) error) error { + items := c.Client.Bucket(bucketName).Objects(ctx, nil) + for { + object, err := items.Next() + if err == IteratorDone { + break + } + if err != nil { + err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err) + return err + } + if err = visit(object.Name); err != nil { + return err + } + } + return nil } // Close closes the GCP Client and logs any useful errors -func (c *GCPClient) Close(log logr.Logger) { +func (c *GCPClient) Close(ctx context.Context) { + log := logr.FromContextOrDiscard(ctx) if err := c.Client.Close(); err != nil { - log.Error(err, "GCP Provider") + log.Error(err, "closing GCP client") } } + +// ObjectIsNotFound checks if the error provided is ErrorObjectDoesNotExist(object does not exist) +func (c *GCPClient) ObjectIsNotFound(err error) bool { + return errors.Is(err, ErrorObjectDoesNotExist) +} diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index 7f431a44d..f2e4235a4 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -32,10 +32,14 @@ import ( "time" gcpstorage "cloud.google.com/go/storage" + "github.com/fluxcd/pkg/apis/meta" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/gcp" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "google.golang.org/api/option" ) @@ -43,6 +47,7 @@ import ( const ( bucketName string = "test-bucket" objectName string = "test.yaml" + region = "us-east-1" ) var ( @@ -50,6 +55,55 @@ var ( client *gcpstorage.Client close func() err error + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "serviceaccount": []byte("ewogICAgInR5cGUiOiAic2VydmljZV9hY2NvdW50IiwKICAgICJwcm9qZWN0X2lkIjogInBvZGluZm8iLAogICAgInByaXZhdGVfa2V5X2lkIjogIjI4cXdnaDNnZGY1aGozZ2I1ZmozZ3N1NXlmZ2gzNGY0NTMyNDU2OGh5MiIsCiAgICAicHJpdmF0ZV9rZXkiOiAiLS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tXG5Id2V0aGd5MTIzaHVnZ2hoaGJkY3U2MzU2ZGd5amhzdmd2R0ZESFlnY2RqYnZjZGhic3g2M2Ncbjc2dGd5Y2ZlaHVoVkdURllmdzZ0N3lkZ3lWZ3lkaGV5aHVnZ3ljdWhland5NnQzNWZ0aHl1aGVndmNldGZcblRGVUhHVHlnZ2h1Ymh4ZTY1eWd0NnRneWVkZ3kzMjZodWN5dnN1aGJoY3Zjc2poY3NqaGNzdmdkdEhGQ0dpXG5IY3llNnR5eWczZ2Z5dWhjaGNzYmh5Z2NpamRiaHl5VEY2NnR1aGNldnVoZGNiaHVoaHZmdGN1aGJoM3VoN3Q2eVxuZ2d2ZnRVSGJoNnQ1cmZ0aGh1R1ZSdGZqaGJmY3JkNXI2N3l1aHV2Z0ZUWWpndnRmeWdoYmZjZHJoeWpoYmZjdGZkZnlodmZnXG50Z3ZnZ3RmeWdodmZ0NnR1Z3ZURjVyNjZ0dWpoZ3ZmcnR5aGhnZmN0Nnk3eXRmcjVjdHZnaGJoaHZ0Z2hoanZjdHRmeWNmXG5mZnhmZ2hqYnZnY2d5dDY3dWpiZ3ZjdGZ5aFZDN3VodmdjeWp2aGhqdnl1amNcbmNnZ2hndmdjZmhnZzc2NTQ1NHRjZnRoaGdmdHloaHZ2eXZ2ZmZnZnJ5eXU3N3JlcmVkc3dmdGhoZ2ZjZnR5Y2ZkcnR0ZmhmL1xuLS0tLS1FTkQgUFJJVkFURSBLRVktLS0tLVxuIiwKICAgICJjbGllbnRfZW1haWwiOiAidGVzdEBwb2RpbmZvLmlhbS5nc2VydmljZWFjY291bnQuY29tIiwKICAgICJjbGllbnRfaWQiOiAiMzI2NTc2MzQ2Nzg3NjI1MzY3NDYiLAogICAgImF1dGhfdXJpIjogImh0dHBzOi8vYWNjb3VudHMuZ29vZ2xlLmNvbS9vL29hdXRoMi9hdXRoIiwKICAgICJ0b2tlbl91cmkiOiAiaHR0cHM6Ly9vYXV0aDIuZ29vZ2xlYXBpcy5jb20vdG9rZW4iLAogICAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICAgImNsaWVudF94NTA5X2NlcnRfdXJsIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL3JvYm90L3YxL21ldGFkYXRhL3g1MDkvdGVzdCU0MHBvZGluZm8uaWFtLmdzZXJ2aWNlYWNjb3VudC5jb20iCn0="), + }, + Type: "Opaque", + } + badSecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "username": []byte("test-user"), + }, + Type: "Opaque", + } + bucket = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "storage.googleapis.com", + Region: region, + Provider: "gcp", + Insecure: true, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + bucketNoSecretRef = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "storage.googleapis.com", + Region: region, + Provider: "gcp", + Insecure: true, + }, + } ) func TestMain(m *testing.M) { @@ -109,10 +163,11 @@ func TestMain(m *testing.M) { os.Exit(run) } -func TestNewClient(t *testing.T) { - gcpClient, err := gcp.NewClient(context.Background(), option.WithHTTPClient(hc)) - assert.NilError(t, err) - assert.Assert(t, gcpClient != nil) +func TestNewClientWithSecretErr(t *testing.T) { + gcpClient, err := gcp.NewClient(context.Background(), secret, bucket) + t.Log(err) + assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value") + assert.Assert(t, gcpClient == nil) } func TestBucketExists(t *testing.T) { @@ -156,19 +211,29 @@ func TestObjectNotExists(t *testing.T) { assert.Assert(t, !exists) } -func TestListObjects(t *testing.T) { +func TestVisitObjects(t *testing.T) { gcpClient := &gcp.GCPClient{ Client: client, } - objectIterator := gcpClient.ListObjects(context.Background(), bucketName, nil) - for { - _, err := objectIterator.Next() - if err == gcp.IteratorDone { - break - } - assert.NilError(t, err) + + objs := []string{} + err := gcpClient.VisitObjects(context.Background(), bucketName, func(path string) error { + objs = append(objs, path) + return nil + }) + assert.NilError(t, err) + assert.DeepEqual(t, objs, []string{}) +} + +func TestVisitObjectsErr(t *testing.T) { + gcpClient := &gcp.GCPClient{ + Client: client, } - assert.Assert(t, objectIterator != nil) + badBucketName := "bad-bucket" + err := gcpClient.VisitObjects(context.Background(), badBucketName, func(path string) error { + return nil + }) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName)) } func TestFGetObject(t *testing.T) { @@ -202,8 +267,8 @@ func TestFGetObjectNotExists(t *testing.T) { func TestFGetObjectDirectoryIsFileName(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) - defer os.RemoveAll(tempDir) assert.NilError(t, err) + defer os.RemoveAll(tempDir) gcpClient := &gcp.GCPClient{ Client: client, } diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go new file mode 100644 index 000000000..df4ab64ed --- /dev/null +++ b/pkg/minio/minio.go @@ -0,0 +1,113 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "fmt" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/s3utils" + corev1 "k8s.io/api/core/v1" +) + +type MinioClient struct { + // client for interacting with S3 compatible + // Storage APIs. + *minio.Client +} + +// NewClient creates a new Minio storage client. +func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket) (*MinioClient, error) { + opt := minio.Options{ + Region: bucket.Spec.Region, + Secure: !bucket.Spec.Insecure, + } + + if bucket.Spec.SecretRef != nil { + accesskey := "" + secretkey := "" + if k, ok := secret.Data["accesskey"]; ok { + accesskey = string(k) + } + if k, ok := secret.Data["secretkey"]; ok { + secretkey = string(k) + } + if accesskey == "" || secretkey == "" { + return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) + } + opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "") + } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { + opt.Creds = credentials.NewIAM("") + } + + if opt.Creds == nil { + return nil, fmt.Errorf("no bucket credentials found") + } + + client, err := minio.New(bucket.Spec.Endpoint, &opt) + if err != nil { + return nil, err + } + + return &MinioClient{Client: client}, nil +} + +// ObjectExists checks if the object with the provided name exists. +func (c *MinioClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) { + _, err := c.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{}) + if err != nil { + return false, err + } + return true, nil +} + +// FGetObject gets the object from the bucket and downloads the object locally. +func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) error { + return c.Client.FGetObject(ctx, bucketName, objectName, localPath, minio.GetObjectOptions{}) +} + +// ListObjects lists all the objects in a bucket and downloads the objects. +func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(string) error) error { + for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ + Recursive: true, + UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()), + }) { + if object.Err != nil { + err := fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, object.Err) + return err + } + + if err := visit(object.Key); err != nil { + return err + } + } + return nil +} + +// Close closes the Minio Client and logs any useful errors +func (c *MinioClient) Close(ctx context.Context) { + //minio client does not provide a close method +} + +// ObjectIsNotFound checks if the error provided is NoSuchKey(object does not exist) +func (c *MinioClient) ObjectIsNotFound(err error) bool { + resp, ok := err.(minio.ErrorResponse) + return ok && resp.Code != "NoSuchKey" +} diff --git a/pkg/minio/minio_test.go b/pkg/minio/minio_test.go new file mode 100644 index 000000000..aac36a019 --- /dev/null +++ b/pkg/minio/minio_test.go @@ -0,0 +1,262 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package minio_test + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/fluxcd/pkg/apis/meta" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/fluxcd/source-controller/pkg/minio" + "github.com/fluxcd/source-controller/pkg/sourceignore" + + "github.com/google/uuid" + miniov7 "github.com/minio/minio-go/v7" + "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + objectName string = "test.yaml" + region string = "us-east-1" +) + +var ( + minioclient *minio.MinioClient + bucketName = "test-bucket-minio" + uuid.New().String() + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "accesskey": []byte("Q3AM3UQ867SPQQA43P2F"), + "secretkey": []byte("zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"), + }, + Type: "Opaque", + } + bucket = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "generic", + Insecure: true, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + emptySecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{}, + Type: "Opaque", + } + bucketNoSecretRef = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "generic", + Insecure: true, + }, + } + bucketAwsProvider = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "aws", + Insecure: true, + }, + } +) + +func TestMain(m *testing.M) { + var err error + ctx := context.Background() + minioclient, err = minio.NewClient(ctx, secret, bucket) + if err != nil { + log.Fatal(err) + } + createBucket(ctx) + addObjectToBucket(ctx) + run := m.Run() + removeObjectFromBucket(ctx) + deleteBucket(ctx) + os.Exit(run) +} + +func TestNewClient(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, secret, bucket) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestNewClientEmptySecret(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, emptySecret, bucket) + assert.Error(t, err, fmt.Sprintf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", emptySecret.Name)) + assert.Assert(t, minioClient == nil) +} + +func TestNewClientNoSecretRef(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, corev1.Secret{}, bucketNoSecretRef) + assert.Error(t, err, "no bucket credentials found") + assert.Assert(t, minioClient == nil) +} + +func TestNewClientAwsProvider(t *testing.T) { + ctx := context.Background() + minioClient, err := minio.NewClient(ctx, corev1.Secret{}, bucketAwsProvider) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestBucketExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.BucketExists(ctx, bucketName) + assert.NilError(t, err) + assert.Assert(t, exists) +} + +func TestBucketNotExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.BucketExists(ctx, "notexistsbucket") + assert.NilError(t, err) + assert.Assert(t, !exists) +} + +func TestObjectExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.ObjectExists(ctx, bucketName, objectName) + assert.NilError(t, err) + assert.Assert(t, exists) +} + +func TestObjectNotExists(t *testing.T) { + ctx := context.Background() + exists, err := minioclient.ObjectExists(ctx, bucketName, "notexists.yaml") + assert.Error(t, err, "The specified key does not exist.") + assert.Assert(t, !exists) +} + +func TestFGetObject(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + err = minioclient.FGetObject(ctx, bucketName, objectName, path) + assert.NilError(t, err) +} + +func TestVisitObjects(t *testing.T) { + ctx := context.Background() + objs := []string{} + err := minioclient.VisitObjects(ctx, bucketName, func(path string) error { + objs = append(objs, path) + return nil + }) + assert.NilError(t, err) + assert.DeepEqual(t, objs, []string{objectName}) +} + +func TestVisitObjectsErr(t *testing.T) { + ctx := context.Background() + badBucketName := "bad-bucket" + err := minioclient.VisitObjects(ctx, badBucketName, func(string) error { + return nil + }) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName)) +} + +func createBucket(ctx context.Context) { + if err := minioclient.Client.MakeBucket(ctx, bucketName, miniov7.MakeBucketOptions{Region: region}); err != nil { + exists, errBucketExists := minioclient.BucketExists(ctx, bucketName) + if errBucketExists == nil && exists { + deleteBucket(ctx) + } else { + log.Fatalln(err) + } + } +} + +func deleteBucket(ctx context.Context) { + if err := minioclient.Client.RemoveBucket(ctx, bucketName); err != nil { + log.Println(err) + } +} + +func addObjectToBucket(ctx context.Context) { + fileReader := strings.NewReader(getObjectFile()) + fileSize := fileReader.Size() + _, err := minioclient.Client.PutObject(ctx, bucketName, objectName, fileReader, fileSize, miniov7.PutObjectOptions{ + ContentType: "text/x-yaml", + }) + if err != nil { + log.Println(err) + } +} + +func removeObjectFromBucket(ctx context.Context) { + if err := minioclient.Client.RemoveObject(ctx, bucketName, objectName, miniov7.RemoveObjectOptions{ + GovernanceBypass: true, + }); err != nil { + log.Println(err) + } +} + +func getObjectFile() string { + return ` + apiVersion: source.toolkit.fluxcd.io/v1beta1 + kind: Bucket + metadata: + name: podinfo + namespace: default + spec: + interval: 5m + provider: aws + bucketName: podinfo + endpoint: s3.amazonaws.com + region: us-east-1 + timeout: 30s + ` +}