From 708172d9bbdceb5147255884e92f7c4033959f80 Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Mon, 19 May 2025 08:50:37 +0000 Subject: [PATCH 1/3] Add warning if temp location bucket has soft delete enabled for Go SDK (resolves #31606) --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 23 ++++++++++ sdks/go/pkg/beam/util/gcsx/gcs.go | 18 ++++++++ sdks/go/pkg/beam/util/gcsx/gcs_test.go | 46 +++++++++++++++++++ 3 files changed, 87 insertions(+) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 73667fb8ee6e..60bb2bc6bd42 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -268,6 +268,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if *stagingLocation == "" { return nil, errors.New("no GCS staging location specified. Use --staging_location=gs:///") } + + checkSoftDeletePolicyEnabled(ctx, *stagingLocation, "staging_location") + var jobLabels map[string]string if *labels != "" { if err := json.Unmarshal([]byte(*labels), &jobLabels); err != nil { @@ -411,6 +414,8 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if opts.TempLocation == "" { opts.TempLocation = gcsx.Join(*stagingLocation, "tmp") } + + checkSoftDeletePolicyEnabled(ctx, opts.TempLocation, "temp_location") return opts, nil } @@ -456,3 +461,21 @@ func getContainerImage(ctx context.Context) string { } panic(fmt.Sprintf("Unsupported environment %v", urn)) } + +func checkSoftDeletePolicyEnabled(ctx context.Context, bucketName string, locationName string) { + bucket, _, err := gcsx.ParseObject(bucketName) + if err != nil { + log.Warnf(ctx, "Error parsing bucket name: %v", err) + return + } + if enabled, err_msg := gcsx.SoftDeletePolicyEnabled(ctx, bucket); err_msg != nil { + log.Warnf(ctx,"Error checking SoftDeletePolicy: %v", err_msg) + }else if enabled { + log.Warnf(ctx,"Bucket %s specified in %s has soft-delete policy enabled. "+ + "Dataflow jobs use Cloud Storage to store temporary files during pipeline execution. "+ + "To avoid being billed for unnecessary storage costs, turn off the soft delete feature "+ + "on buckets that your Dataflow jobs use for temporary storage. "+ + "For more information, see https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.", + bucketName,locationName) + } +} diff --git a/sdks/go/pkg/beam/util/gcsx/gcs.go b/sdks/go/pkg/beam/util/gcsx/gcs.go index 1dd85924447f..0d2a6ccee8d7 100644 --- a/sdks/go/pkg/beam/util/gcsx/gcs.go +++ b/sdks/go/pkg/beam/util/gcsx/gcs.go @@ -63,6 +63,24 @@ func Upload(ctx context.Context, client *storage.Client, project, bucket, object } +var getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) { + client, err := storage.NewClient(ctx) + if err != nil { + return nil, err + } + defer client.Close() + return client.Bucket(bucketName).Attrs(ctx) +} + +// SoftDeletePolicyEnabled returns true if SoftDeletePolicy is enabled on bucket +func SoftDeletePolicyEnabled(ctx context.Context, bucketName string) (bool, error) { + attrs, err := getBucketAttrs(ctx, bucketName) + if err != nil { + return false, err + } + return attrs.SoftDeletePolicy != nil && attrs.SoftDeletePolicy.RetentionDuration > 0, nil +} + // Get BucketAttrs with RetentionDuration of SoftDeletePolicy set to zero for disabling SoftDeletePolicy. func getDisableSoftDeletePolicyBucketAttrs() *storage.BucketAttrs { attrs := &storage.BucketAttrs{ diff --git a/sdks/go/pkg/beam/util/gcsx/gcs_test.go b/sdks/go/pkg/beam/util/gcsx/gcs_test.go index 463ba3ea1833..9438fb54e885 100644 --- a/sdks/go/pkg/beam/util/gcsx/gcs_test.go +++ b/sdks/go/pkg/beam/util/gcsx/gcs_test.go @@ -18,7 +18,9 @@ package gcsx import ( "strings" "testing" + "context" + "cloud.google.com/go/storage" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -106,3 +108,47 @@ func TestGetDisableSoftDeletePolicyBucketAttrs(t *testing.T) { t.Errorf("attrs has RetentionDuration %v which is not correct", attrs.SoftDeletePolicy.RetentionDuration) } } + +func TestSoftDeletePolicyWhenEnabled(t *testing.T) { + // Save original and defer restore + original := getBucketAttrs + defer func() { getBucketAttrs = original }() + + // Inject mock behavior + getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) { + return &storage.BucketAttrs{ + SoftDeletePolicy: &storage.SoftDeletePolicy{ + RetentionDuration: 1029, + }, + }, nil + } + + enabled, err := SoftDeletePolicyEnabled(context.Background(), "mock-bucket") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !enabled { + t.Errorf("Expected soft delete to be enabled, got false") + } +} + +func TestSoftDeletePolicyWhenDisabled(t *testing.T) { + original := getBucketAttrs + defer func() { getBucketAttrs = original }() + + getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) { + return &storage.BucketAttrs{ + SoftDeletePolicy: &storage.SoftDeletePolicy{ + RetentionDuration: 0, + }, + }, nil + } + + enabled, err := SoftDeletePolicyEnabled(context.Background(), "mock-bucket") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if enabled { + t.Errorf("Expected soft delete to be disabled, got true") + } +} From e112886321fb4d379759eec5eb5339d1a6c9dd50 Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Mon, 19 May 2025 14:52:34 +0000 Subject: [PATCH 2/3] Corrected Formatting --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 22 +++++++++---------- sdks/go/pkg/beam/util/gcsx/gcs_test.go | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 60bb2bc6bd42..fa0af5fd5423 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -269,7 +269,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions return nil, errors.New("no GCS staging location specified. Use --staging_location=gs:///") } - checkSoftDeletePolicyEnabled(ctx, *stagingLocation, "staging_location") + checkSoftDeletePolicyEnabled(ctx, *stagingLocation, "staging_location") var jobLabels map[string]string if *labels != "" { @@ -414,7 +414,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if opts.TempLocation == "" { opts.TempLocation = gcsx.Join(*stagingLocation, "tmp") } - + checkSoftDeletePolicyEnabled(ctx, opts.TempLocation, "temp_location") return opts, nil @@ -463,19 +463,19 @@ func getContainerImage(ctx context.Context) string { } func checkSoftDeletePolicyEnabled(ctx context.Context, bucketName string, locationName string) { - bucket, _, err := gcsx.ParseObject(bucketName) + bucket, _, err := gcsx.ParseObject(bucketName) if err != nil { log.Warnf(ctx, "Error parsing bucket name: %v", err) return } if enabled, err_msg := gcsx.SoftDeletePolicyEnabled(ctx, bucket); err_msg != nil { - log.Warnf(ctx,"Error checking SoftDeletePolicy: %v", err_msg) - }else if enabled { - log.Warnf(ctx,"Bucket %s specified in %s has soft-delete policy enabled. "+ - "Dataflow jobs use Cloud Storage to store temporary files during pipeline execution. "+ - "To avoid being billed for unnecessary storage costs, turn off the soft delete feature "+ - "on buckets that your Dataflow jobs use for temporary storage. "+ - "For more information, see https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.", - bucketName,locationName) + log.Warnf(ctx, "Error checking SoftDeletePolicy: %v", err_msg) + } else if enabled { + log.Warnf(ctx, "Bucket %s specified in %s has soft-delete policy enabled. "+ + "Dataflow jobs use Cloud Storage to store temporary files during pipeline execution. "+ + "To avoid being billed for unnecessary storage costs, turn off the soft delete feature "+ + "on buckets that your Dataflow jobs use for temporary storage. "+ + "For more information, see https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.", + bucketName, locationName) } } diff --git a/sdks/go/pkg/beam/util/gcsx/gcs_test.go b/sdks/go/pkg/beam/util/gcsx/gcs_test.go index 9438fb54e885..a1268cbb8cd8 100644 --- a/sdks/go/pkg/beam/util/gcsx/gcs_test.go +++ b/sdks/go/pkg/beam/util/gcsx/gcs_test.go @@ -16,11 +16,11 @@ package gcsx import ( + "context" "strings" "testing" - "context" - "cloud.google.com/go/storage" + "cloud.google.com/go/storage" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) From d0634b710a9a9ea20df3f58bf3759c98724782d5 Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Thu, 5 Jun 2025 07:25:14 +0000 Subject: [PATCH 3/3] Applied suggested changes --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 9 ++++++++- sdks/go/pkg/beam/util/gcsx/gcs.go | 11 +++-------- sdks/go/pkg/beam/util/gcsx/gcs_test.go | 9 +++++---- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index fa0af5fd5423..25dbe365dccc 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -468,7 +468,14 @@ func checkSoftDeletePolicyEnabled(ctx context.Context, bucketName string, locati log.Warnf(ctx, "Error parsing bucket name: %v", err) return } - if enabled, err_msg := gcsx.SoftDeletePolicyEnabled(ctx, bucket); err_msg != nil { + client, err := storage.NewClient(ctx) + if err != nil { + log.Warnf(ctx, "Error creating GCS client: %v", err) + return + } + defer client.Close() + + if enabled, err_msg := gcsx.SoftDeletePolicyEnabled(ctx, client, bucket); err_msg != nil { log.Warnf(ctx, "Error checking SoftDeletePolicy: %v", err_msg) } else if enabled { log.Warnf(ctx, "Bucket %s specified in %s has soft-delete policy enabled. "+ diff --git a/sdks/go/pkg/beam/util/gcsx/gcs.go b/sdks/go/pkg/beam/util/gcsx/gcs.go index 0d2a6ccee8d7..0986851c16d0 100644 --- a/sdks/go/pkg/beam/util/gcsx/gcs.go +++ b/sdks/go/pkg/beam/util/gcsx/gcs.go @@ -63,18 +63,13 @@ func Upload(ctx context.Context, client *storage.Client, project, bucket, object } -var getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) { - client, err := storage.NewClient(ctx) - if err != nil { - return nil, err - } - defer client.Close() +var getBucketAttrs = func(ctx context.Context, client *storage.Client, bucketName string) (*storage.BucketAttrs, error) { return client.Bucket(bucketName).Attrs(ctx) } // SoftDeletePolicyEnabled returns true if SoftDeletePolicy is enabled on bucket -func SoftDeletePolicyEnabled(ctx context.Context, bucketName string) (bool, error) { - attrs, err := getBucketAttrs(ctx, bucketName) +func SoftDeletePolicyEnabled(ctx context.Context, client *storage.Client, bucketName string) (bool, error) { + attrs, err := getBucketAttrs(ctx, client, bucketName) if err != nil { return false, err } diff --git a/sdks/go/pkg/beam/util/gcsx/gcs_test.go b/sdks/go/pkg/beam/util/gcsx/gcs_test.go index a1268cbb8cd8..cad3ad3754f0 100644 --- a/sdks/go/pkg/beam/util/gcsx/gcs_test.go +++ b/sdks/go/pkg/beam/util/gcsx/gcs_test.go @@ -115,7 +115,7 @@ func TestSoftDeletePolicyWhenEnabled(t *testing.T) { defer func() { getBucketAttrs = original }() // Inject mock behavior - getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) { + getBucketAttrs = func(ctx context.Context, client *storage.Client, bucketName string) (*storage.BucketAttrs, error) { return &storage.BucketAttrs{ SoftDeletePolicy: &storage.SoftDeletePolicy{ RetentionDuration: 1029, @@ -123,7 +123,8 @@ func TestSoftDeletePolicyWhenEnabled(t *testing.T) { }, nil } - enabled, err := SoftDeletePolicyEnabled(context.Background(), "mock-bucket") + // You can pass nil for client because the mock ignores it + enabled, err := SoftDeletePolicyEnabled(context.Background(), nil, "mock-bucket") if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -136,7 +137,7 @@ func TestSoftDeletePolicyWhenDisabled(t *testing.T) { original := getBucketAttrs defer func() { getBucketAttrs = original }() - getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) { + getBucketAttrs = func(ctx context.Context, client *storage.Client, bucketName string) (*storage.BucketAttrs, error) { return &storage.BucketAttrs{ SoftDeletePolicy: &storage.SoftDeletePolicy{ RetentionDuration: 0, @@ -144,7 +145,7 @@ func TestSoftDeletePolicyWhenDisabled(t *testing.T) { }, nil } - enabled, err := SoftDeletePolicyEnabled(context.Background(), "mock-bucket") + enabled, err := SoftDeletePolicyEnabled(context.Background(), nil, "mock-bucket") if err != nil { t.Fatalf("Unexpected error: %v", err) }