From 84bf842db7e88db99251cccb387e2537186f83b1 Mon Sep 17 00:00:00 2001 From: Leonardo Cesar Borges Date: Wed, 27 Nov 2024 09:59:49 -0300 Subject: [PATCH 1/2] Adding Google Storage Requester pays feature to Golang SDK. Setting UserProject on Google Storage Bucket operations to enable requester pays feature. Requester pays project ID will come from environment variable named `BILLING_PROJECT_ID` More information about Google storage requester pays feature here https://cloud.google.com/storage/docs/requester-pays --- sdks/go/pkg/beam/io/filesystem/gcs/gcs.go | 32 ++++++++++++------- .../go/pkg/beam/io/filesystem/gcs/gcs_test.go | 9 ++++++ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go index 55509d9ff2f4..0aa540e7a3df 100644 --- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go +++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "io" + "os" "path/filepath" "time" @@ -38,12 +39,14 @@ func init() { } type fs struct { - client *storage.Client + client *storage.Client + billingProjectID string } // New creates a new Google Cloud Storage filesystem using application // default credentials. If it fails, it falls back to unauthenticated // access. +// It will use the environment variable named `BILLING_PROJECT_ID` as requester payer bucket attribute. func New(ctx context.Context) filesystem.Interface { client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite) if err != nil { @@ -54,7 +57,12 @@ func New(ctx context.Context) filesystem.Interface { panic(errors.Wrapf(err, "failed to create GCS client")) } } - return &fs{client: client} + billingProjectIDEnvVarName := "BILLING_PROJECT_ID" + billingProjectID := os.Getenv(billingProjectIDEnvVarName) + return &fs{ + client: client, + billingProjectID: billingProjectID, + } } func (f *fs) Close() error { @@ -73,7 +81,7 @@ func (f *fs) List(ctx context.Context, glob string) ([]string, error) { // For now, we assume * is the first matching character to make a // prefix listing and not list the entire bucket. prefix := fsx.GetPrefix(object) - it := f.client.Bucket(bucket).Objects(ctx, &storage.Query{ + it := f.client.Bucket(bucket).UserProject(f.billingProjectID).Objects(ctx, &storage.Query{ Prefix: prefix, }) for { @@ -107,7 +115,7 @@ func (f *fs) OpenRead(ctx context.Context, filename string) (io.ReadCloser, erro return nil, err } - return f.client.Bucket(bucket).Object(object).NewReader(ctx) + return f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(object).NewReader(ctx) } // TODO(herohde) 7/12/2017: should we create the bucket in OpenWrite? For now, "no". @@ -118,7 +126,7 @@ func (f *fs) OpenWrite(ctx context.Context, filename string) (io.WriteCloser, er return nil, err } - return f.client.Bucket(bucket).Object(object).NewWriter(ctx), nil + return f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(object).NewWriter(ctx), nil } func (f *fs) Size(ctx context.Context, filename string) (int64, error) { @@ -127,7 +135,7 @@ func (f *fs) Size(ctx context.Context, filename string) (int64, error) { return -1, err } - obj := f.client.Bucket(bucket).Object(object) + obj := f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(object) attrs, err := obj.Attrs(ctx) if err != nil { return -1, err @@ -143,7 +151,7 @@ func (f *fs) LastModified(ctx context.Context, filename string) (time.Time, erro return time.Time{}, err } - obj := f.client.Bucket(bucket).Object(object) + obj := f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(object) attrs, err := obj.Attrs(ctx) if err != nil { return time.Time{}, err @@ -159,7 +167,7 @@ func (f *fs) Remove(ctx context.Context, filename string) error { return err } - obj := f.client.Bucket(bucket).Object(object) + obj := f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(object) return obj.Delete(ctx) } @@ -169,13 +177,13 @@ func (f *fs) Copy(ctx context.Context, srcpath, dstpath string) error { if err != nil { return err } - srcobj := f.client.Bucket(bucket).Object(src) + srcobj := f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(src) bucket, dst, err := gcsx.ParseObject(dstpath) if err != nil { return err } - dstobj := f.client.Bucket(bucket).Object(dst) + dstobj := f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(dst) cp := dstobj.CopierFrom(srcobj) _, err = cp.Run(ctx) @@ -188,13 +196,13 @@ func (f *fs) Rename(ctx context.Context, srcpath, dstpath string) error { if err != nil { return err } - srcobj := f.client.Bucket(bucket).Object(src) + srcobj := f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(src) bucket, dst, err := gcsx.ParseObject(dstpath) if err != nil { return err } - dstobj := f.client.Bucket(bucket).Object(dst) + dstobj := f.client.Bucket(bucket).UserProject(f.billingProjectID).Object(dst) cp := dstobj.CopierFrom(srcobj) _, err = cp.Run(ctx) diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go b/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go index 09b5da0db127..4befc73d55e6 100644 --- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go +++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go @@ -43,6 +43,15 @@ func TestGCS_FilesystemNew(t *testing.T) { } func TestGCS_direct(t *testing.T) { + testGCS_direct(t) +} + +func TestGCS_directSettingBillingProjectID(t *testing.T) { + t.Setenv("BILLING_PROJECT_ID", "projectfake") + testGCS_direct(t) +} + +func testGCS_direct(t *testing.T) { ctx := context.Background() dirPath := "gs://beamgogcsfilesystemtest" filePath := dirPath + "/file.txt" From bfc907787f44c0769ae4bf067c66b673006e1637 Mon Sep 17 00:00:00 2001 From: Leonardo Cesar Borges Date: Wed, 27 Nov 2024 10:09:37 -0300 Subject: [PATCH 2/2] Adding new entry to CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 654512c3a4e2..976a5ab321ac 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,7 +65,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## New Features / Improvements - +* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes