Skip to content
171 changes: 165 additions & 6 deletions gcs/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ package client

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
"time"

"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"

"cloud.google.com/go/storage"

Expand All @@ -37,6 +40,15 @@ import (
// client disallow an attempted write operation.
var ErrInvalidROWriteOperation = errors.New("the client operates in read only mode. Change 'credentials_source' parameter value ")

// To enforce concurent go routine numbers during delete-recursive operation
const maxConcurrency = 10

type BlobProperties struct {
ETag string `json:"etag,omitempty"`
LastModified time.Time `json:"last_modified,omitempty"`
ContentLength int64 `json:"content_length,omitempty"`
}

// GCSBlobstore encapsulates interaction with the GCS blobstore
type GCSBlobstore struct {
authenticatedGCS *storage.Client
Expand Down Expand Up @@ -68,6 +80,11 @@ func (client *GCSBlobstore) getObjectHandle(gcs *storage.Client, src string) *st
return handle
}

func (client *GCSBlobstore) getBucketHandle(gcs *storage.Client) *storage.BucketHandle {
handle := gcs.Bucket(client.config.BucketName)
return handle
}

// New returns a GCSBlobstore configured to operate using the given config
//
// non-nil error is returned on invalid Client or config. If the configuration
Expand Down Expand Up @@ -120,7 +137,6 @@ func (client *GCSBlobstore) getReader(gcs *storage.Client, src string) (*storage
const retryAttempts = 3

func (client *GCSBlobstore) Put(sourceFilePath string, dest string) error {

src, err := os.Open(sourceFilePath)
if err != nil {
return err
Expand Down Expand Up @@ -246,21 +262,164 @@ func (client *GCSBlobstore) Sign(id string, action string, expiry time.Duration)
}

func (client *GCSBlobstore) List(prefix string) ([]string, error) {
return nil, errors.New("not implemented")
if prefix != "" {
log.Printf("Listing objects in bucket %s with prefix '%s'\n", client.config.BucketName, prefix)
} else {
log.Printf("Listing objects in bucket %s\n", client.config.BucketName)
}
if client.readOnly() {
return nil, ErrInvalidROWriteOperation
}

bh := client.getBucketHandle(client.authenticatedGCS)

it := bh.Objects(context.Background(), &storage.Query{Prefix: prefix})

var names []string
for {
attr, err := it.Next()
if err == iterator.Done {
break
}

if err != nil {
return nil, err
}

names = append(names, attr.Name)
}

return names, nil

}

func (client *GCSBlobstore) Copy(srcBlob string, dstBlob string) error {
return errors.New("not implemented")
log.Printf("copying an object from %s to %s\n", srcBlob, dstBlob)
if client.readOnly() {
return ErrInvalidROWriteOperation
}

srcHandle := client.getObjectHandle(client.authenticatedGCS, srcBlob)
dstHandle := client.getObjectHandle(client.authenticatedGCS, dstBlob)

_, err := dstHandle.CopierFrom(srcHandle).Run(context.Background())
if err != nil {
return fmt.Errorf("copying object: %w", err)
}
return nil
}

func (client *GCSBlobstore) Properties(dest string) error {
return errors.New("not implemented")
log.Printf("Getting properties for object %s/%s\n", client.config.BucketName, dest)
if client.readOnly() {
return ErrInvalidROWriteOperation
}
oh := client.getObjectHandle(client.authenticatedGCS, dest)
attr, err := oh.Attrs(context.Background())

if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
fmt.Println(`{}`)
return nil
}
return fmt.Errorf("getting attributes: %w", err)
}

props := BlobProperties{
ETag: strings.Trim(attr.Etag, `"`),
LastModified: attr.Updated,
ContentLength: attr.Size,
}

output, err := json.MarshalIndent(props, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal blob properties: %w", err)
}

fmt.Println(string(output))
return nil
}

func (client *GCSBlobstore) EnsureStorageExists() error {
return errors.New("not implemented")
log.Printf("Ensuring bucket '%s' exists\n", client.config.BucketName)
if client.readOnly() {
return ErrInvalidROWriteOperation
}
ctx := context.Background()
bh := client.getBucketHandle(client.authenticatedGCS)

_, err := bh.Attrs(ctx)
if errors.Is(err, storage.ErrBucketNotExist) {
battr := &storage.BucketAttrs{Name: client.config.BucketName}
if client.config.StorageClass != "" {
battr.StorageClass = client.config.StorageClass
}

projectID, err := extractProjectID(ctx, client.config)
if err != nil {
return fmt.Errorf("extracting project ID: %w", err)
}

err = bh.Create(ctx, projectID, battr)
if err != nil {
return fmt.Errorf("creating bucket: %w", err)
}
return nil
}
if err != nil {
return fmt.Errorf("checking bucket: %w", err)
}

return nil
}

func (client *GCSBlobstore) DeleteRecursive(prefix string) error {
return errors.New("not implemented")
if prefix != "" {
log.Printf("Deleting all objects in bucket %s with prefix '%s'\n",
client.config.BucketName, prefix)
} else {
log.Printf("Deleting all objects in bucket %s\n", client.config.BucketName)
}

if client.readOnly() {
return ErrInvalidROWriteOperation
}

names, err := client.List(prefix)
if err != nil {
return fmt.Errorf("listing objects: %w", err)
}

errChan := make(chan error, len(names))
semaphore := make(chan struct{}, maxConcurrency)
wg := &sync.WaitGroup{}
for _, n := range names {
name := n
wg.Add(1)
go func() {
defer wg.Done()

semaphore <- struct{}{}
defer func() { <-semaphore }()

err := client.getObjectHandle(client.authenticatedGCS, name).Delete(context.Background())
if err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
errChan <- fmt.Errorf("deleting object %s: %w", name, err)
}
}()
}

wg.Wait()
close(errChan)

var errs []error
for err := range errChan {
errs = append(errs, err)
}

if len(errs) > 0 {
return errors.Join(errs...)
}

return nil
}
36 changes: 36 additions & 0 deletions gcs/client/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package client

import (
"context"
"encoding/json"
"errors"
"fmt"

"golang.org/x/oauth2/google"

Expand Down Expand Up @@ -52,3 +54,37 @@ func newStorageClients(ctx context.Context, cfg *config.GCSCli) (*storage.Client
}
return authenticatedClient, publicClient, err
}

func extractProjectID(ctx context.Context, cfg *config.GCSCli) (string, error) {
switch cfg.CredentialsSource {
case config.ServiceAccountFileCredentialsSource:
// Parse service account JSON to extract project_id
var serviceAccount struct {
ProjectID string `json:"project_id"`
}
if err := json.Unmarshal([]byte(cfg.ServiceAccountFile), &serviceAccount); err != nil {
return "", fmt.Errorf("parsing service account JSON: %w", err)
}
if serviceAccount.ProjectID == "" {
return "", errors.New("project_id not found in service account JSON")
}
return serviceAccount.ProjectID, nil

case config.DefaultCredentialsSource:
// Try to get project ID from default credentials
creds, err := google.FindDefaultCredentials(ctx, storage.ScopeFullControl)
if err != nil {
return "", fmt.Errorf("finding default credentials: %w", err)
}
if creds.ProjectID == "" {
return "", errors.New("project_id not found in default credentials")
}
return creds.ProjectID, nil

case config.NoneCredentialsSource:
return "", errors.New("cannot create bucket with read-only credentials")

default:
return "", errors.New("unknown credentials_source")
}
}
Loading
Loading