diff --git a/CHANGES.md b/CHANGES.md index 6c9d9e7c6b72..4976f95018bf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,7 +74,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## New Features / Improvements - +* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)). * [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go index 55509d9ff2f4..73e686381053 100644 --- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go +++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go @@ -25,6 +25,7 @@ import ( "time" "cloud.google.com/go/storage" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" @@ -33,8 +34,30 @@ import ( "google.golang.org/api/iterator" ) +const ( + projectBillingHook = "beam:go:hook:filesystem:billingproject" +) + +var billingProject string = "" + func init() { filesystem.Register("gs", New) + hf := func(opts []string) hooks.Hook { + return hooks.Hook{ + Init: func(ctx context.Context) (context.Context, error) { + if len(opts) == 0 { + return ctx, nil + } + if len(opts) > 1 { + return ctx, fmt.Errorf("expected 1 option, got %v: %v", len(opts), opts) + } + + billingProject = opts[0] + return ctx, nil + }, + } + } + hooks.RegisterHook(projectBillingHook, hf) } type fs struct { @@ -44,6 +67,7 @@ type fs struct { // New creates a new Google Cloud Storage filesystem using application // default credentials. If it fails, it falls back to unauthenticated // access. +// It will use the environment variable named `BILLING_PROJECT_ID` as requester payer bucket attribute. func New(ctx context.Context) filesystem.Interface { client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite) if err != nil { @@ -54,7 +78,23 @@ func New(ctx context.Context) filesystem.Interface { panic(errors.Wrapf(err, "failed to create GCS client")) } } - return &fs{client: client} + return &fs{ + client: client, + } +} + +func SetRequesterBillingProject(project string) { + billingProject = project +} + +// RequesterBillingProject configure project to be used in google storage operations +// with requester pays actived. More informaiton about requester pays in https://cloud.google.com/storage/docs/requester-pays +func RequesterBillingProject(project string) error { + if project == "" { + return fmt.Errorf("project cannot be empty, got %v", project) + } + // The hook itself is defined in beam/core/runtime/harness/file_system_hooks.go + return hooks.EnableHook(projectBillingHook, project) } func (f *fs) Close() error { @@ -73,7 +113,7 @@ func (f *fs) List(ctx context.Context, glob string) ([]string, error) { // For now, we assume * is the first matching character to make a // prefix listing and not list the entire bucket. prefix := fsx.GetPrefix(object) - it := f.client.Bucket(bucket).Objects(ctx, &storage.Query{ + it := f.client.Bucket(bucket).UserProject(billingProject).Objects(ctx, &storage.Query{ Prefix: prefix, }) for { @@ -107,7 +147,7 @@ func (f *fs) OpenRead(ctx context.Context, filename string) (io.ReadCloser, erro return nil, err } - return f.client.Bucket(bucket).Object(object).NewReader(ctx) + return f.client.Bucket(bucket).UserProject(billingProject).Object(object).NewReader(ctx) } // TODO(herohde) 7/12/2017: should we create the bucket in OpenWrite? For now, "no". @@ -118,7 +158,7 @@ func (f *fs) OpenWrite(ctx context.Context, filename string) (io.WriteCloser, er return nil, err } - return f.client.Bucket(bucket).Object(object).NewWriter(ctx), nil + return f.client.Bucket(bucket).UserProject(billingProject).Object(object).NewWriter(ctx), nil } func (f *fs) Size(ctx context.Context, filename string) (int64, error) { @@ -127,7 +167,7 @@ func (f *fs) Size(ctx context.Context, filename string) (int64, error) { return -1, err } - obj := f.client.Bucket(bucket).Object(object) + obj := f.client.Bucket(bucket).UserProject(billingProject).Object(object) attrs, err := obj.Attrs(ctx) if err != nil { return -1, err @@ -143,7 +183,7 @@ func (f *fs) LastModified(ctx context.Context, filename string) (time.Time, erro return time.Time{}, err } - obj := f.client.Bucket(bucket).Object(object) + obj := f.client.Bucket(bucket).UserProject(billingProject).Object(object) attrs, err := obj.Attrs(ctx) if err != nil { return time.Time{}, err @@ -159,7 +199,7 @@ func (f *fs) Remove(ctx context.Context, filename string) error { return err } - obj := f.client.Bucket(bucket).Object(object) + obj := f.client.Bucket(bucket).UserProject(billingProject).Object(object) return obj.Delete(ctx) } @@ -169,13 +209,13 @@ func (f *fs) Copy(ctx context.Context, srcpath, dstpath string) error { if err != nil { return err } - srcobj := f.client.Bucket(bucket).Object(src) + srcobj := f.client.Bucket(bucket).UserProject(billingProject).Object(src) bucket, dst, err := gcsx.ParseObject(dstpath) if err != nil { return err } - dstobj := f.client.Bucket(bucket).Object(dst) + dstobj := f.client.Bucket(bucket).UserProject(billingProject).Object(dst) cp := dstobj.CopierFrom(srcobj) _, err = cp.Run(ctx) @@ -188,13 +228,13 @@ func (f *fs) Rename(ctx context.Context, srcpath, dstpath string) error { if err != nil { return err } - srcobj := f.client.Bucket(bucket).Object(src) + srcobj := f.client.Bucket(bucket).UserProject(billingProject).Object(src) bucket, dst, err := gcsx.ParseObject(dstpath) if err != nil { return err } - dstobj := f.client.Bucket(bucket).Object(dst) + dstobj := f.client.Bucket(bucket).UserProject(billingProject).Object(dst) cp := dstobj.CopierFrom(srcobj) _, err = cp.Run(ctx) diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go b/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go index 09b5da0db127..66dee6bb23f6 100644 --- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go +++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/google/go-cmp/cmp" @@ -43,6 +44,28 @@ func TestGCS_FilesystemNew(t *testing.T) { } func TestGCS_direct(t *testing.T) { + testGCS_direct(t) +} + +func TestGCS_BillingProjectHookEnable(t *testing.T) { + billingProject := "whatever" + RequesterBillingProject(billingProject) + _, err := hooks.RunInitHooks(context.Background()) + if err != nil { + t.Errorf("error to init hooks = %v", err) + } + projectBillingHook := "beam:go:hook:filesystem:billingproject" + projectBillingHookIsEnable, hookValue := hooks.IsEnabled(projectBillingHook) + if !projectBillingHookIsEnable { + t.Error("project billing hook isn't enable") + } + if hookValue[0] != billingProject { + t.Errorf("projectBillingHook value wrong / want {%s} got {%s}", billingProject, hookValue[0]) + } + +} + +func testGCS_direct(t *testing.T) { ctx := context.Background() dirPath := "gs://beamgogcsfilesystemtest" filePath := dirPath + "/file.txt"