diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 73667fb8ee6e..25dbe365dccc 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -268,6 +268,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if *stagingLocation == "" { return nil, errors.New("no GCS staging location specified. Use --staging_location=gs:///") } + + checkSoftDeletePolicyEnabled(ctx, *stagingLocation, "staging_location") + var jobLabels map[string]string if *labels != "" { if err := json.Unmarshal([]byte(*labels), &jobLabels); err != nil { @@ -412,6 +415,8 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions opts.TempLocation = gcsx.Join(*stagingLocation, "tmp") } + checkSoftDeletePolicyEnabled(ctx, opts.TempLocation, "temp_location") + return opts, nil } @@ -456,3 +461,28 @@ func getContainerImage(ctx context.Context) string { } panic(fmt.Sprintf("Unsupported environment %v", urn)) } + +func checkSoftDeletePolicyEnabled(ctx context.Context, bucketName string, locationName string) { + bucket, _, err := gcsx.ParseObject(bucketName) + if err != nil { + log.Warnf(ctx, "Error parsing bucket name: %v", err) + return + } + client, err := storage.NewClient(ctx) + if err != nil { + log.Warnf(ctx, "Error creating GCS client: %v", err) + return + } + defer client.Close() + + if enabled, err_msg := gcsx.SoftDeletePolicyEnabled(ctx, client, bucket); err_msg != nil { + log.Warnf(ctx, "Error checking SoftDeletePolicy: %v", err_msg) + } else if enabled { + log.Warnf(ctx, "Bucket %s specified in %s has soft-delete policy enabled. "+ + "Dataflow jobs use Cloud Storage to store temporary files during pipeline execution. "+ + "To avoid being billed for unnecessary storage costs, turn off the soft delete feature "+ + "on buckets that your Dataflow jobs use for temporary storage. "+ + "For more information, see https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.", + bucketName, locationName) + } +} diff --git a/sdks/go/pkg/beam/util/gcsx/gcs.go b/sdks/go/pkg/beam/util/gcsx/gcs.go index 1dd85924447f..0986851c16d0 100644 --- a/sdks/go/pkg/beam/util/gcsx/gcs.go +++ b/sdks/go/pkg/beam/util/gcsx/gcs.go @@ -63,6 +63,19 @@ func Upload(ctx context.Context, client *storage.Client, project, bucket, object } +var getBucketAttrs = func(ctx context.Context, client *storage.Client, bucketName string) (*storage.BucketAttrs, error) { + return client.Bucket(bucketName).Attrs(ctx) +} + +// SoftDeletePolicyEnabled returns true if SoftDeletePolicy is enabled on bucket +func SoftDeletePolicyEnabled(ctx context.Context, client *storage.Client, bucketName string) (bool, error) { + attrs, err := getBucketAttrs(ctx, client, bucketName) + if err != nil { + return false, err + } + return attrs.SoftDeletePolicy != nil && attrs.SoftDeletePolicy.RetentionDuration > 0, nil +} + // Get BucketAttrs with RetentionDuration of SoftDeletePolicy set to zero for disabling SoftDeletePolicy. func getDisableSoftDeletePolicyBucketAttrs() *storage.BucketAttrs { attrs := &storage.BucketAttrs{ diff --git a/sdks/go/pkg/beam/util/gcsx/gcs_test.go b/sdks/go/pkg/beam/util/gcsx/gcs_test.go index 463ba3ea1833..cad3ad3754f0 100644 --- a/sdks/go/pkg/beam/util/gcsx/gcs_test.go +++ b/sdks/go/pkg/beam/util/gcsx/gcs_test.go @@ -16,9 +16,11 @@ package gcsx import ( + "context" "strings" "testing" + "cloud.google.com/go/storage" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -106,3 +108,48 @@ func TestGetDisableSoftDeletePolicyBucketAttrs(t *testing.T) { t.Errorf("attrs has RetentionDuration %v which is not correct", attrs.SoftDeletePolicy.RetentionDuration) } } + +func TestSoftDeletePolicyWhenEnabled(t *testing.T) { + // Save original and defer restore + original := getBucketAttrs + defer func() { getBucketAttrs = original }() + + // Inject mock behavior + getBucketAttrs = func(ctx context.Context, client *storage.Client, bucketName string) (*storage.BucketAttrs, error) { + return &storage.BucketAttrs{ + SoftDeletePolicy: &storage.SoftDeletePolicy{ + RetentionDuration: 1029, + }, + }, nil + } + + // You can pass nil for client because the mock ignores it + enabled, err := SoftDeletePolicyEnabled(context.Background(), nil, "mock-bucket") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !enabled { + t.Errorf("Expected soft delete to be enabled, got false") + } +} + +func TestSoftDeletePolicyWhenDisabled(t *testing.T) { + original := getBucketAttrs + defer func() { getBucketAttrs = original }() + + getBucketAttrs = func(ctx context.Context, client *storage.Client, bucketName string) (*storage.BucketAttrs, error) { + return &storage.BucketAttrs{ + SoftDeletePolicy: &storage.SoftDeletePolicy{ + RetentionDuration: 0, + }, + }, nil + } + + enabled, err := SoftDeletePolicyEnabled(context.Background(), nil, "mock-bucket") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if enabled { + t.Errorf("Expected soft delete to be disabled, got true") + } +}