From 9bc43f38535920c5fd552de2b8f056f08abf5024 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 7 Jun 2024 20:31:25 -0400 Subject: [PATCH 1/8] Add warning if soft delete is enabled in temp or staging buckets. --- .../apache_beam/options/pipeline_options.py | 33 +++++++++++++++++++ .../options/pipeline_options_test.py | 19 +++++++++++ 2 files changed, 52 insertions(+) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 0af32837a3fb..d45de9a08e5d 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -913,6 +913,35 @@ def _create_default_gcs_bucket(self): else: return None + # Log warning if soft delete policy is enabled in a gcs bucket + # that is specified in an argument. + # The function returns true only if the policy is enabled. + # If the policy is disabled or there is any exception + def _warn_if_soft_delete_policy_enabled(self, arg_name): + gcs_path = getattr(self, arg_name, None) + try: + from google.api_core.exceptions import GoogleAPICallError + from apache_beam.io.gcp import gcsio + try: + bucket_name, _ = gcsio.parse_gcs_path(gcs_path) + bucket = gcsio.GcsIO().get_bucket(bucket_name) + if (bucket.soft_delete_policy is not None and + bucket.soft_delete_policy.retention_duration_seconds > 0): + _LOGGER.warning( + "Bucket %s specified in %s has soft-delete policy enabled." + " To avoid being billed for unnecessary storage costs, turn" + " off the soft delete feature on buckets that your Dataflow" + " jobs use for temporary and staging storage. For more" + " information, see" + " https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy." + % (bucket_name, arg_name)) + return True + except GoogleAPICallError: + _LOGGER.warning('Unable to check soft delete policy in the bucket of %s.' % gcs_path) + except ImportError: + _LOGGER.warning('Missing dependencies to check soft delete policy.') + return False + # If either temp or staging location has an issue, we use the valid one for # both locations. If both are bad we return an error. def _handle_temp_and_staging_locations(self, validator): @@ -920,9 +949,11 @@ def _handle_temp_and_staging_locations(self, validator): staging_errors = validator.validate_gcs_path(self, 'staging_location') if temp_errors and not staging_errors: setattr(self, 'temp_location', getattr(self, 'staging_location')) + self._warn_if_soft_delete_policy_enabled('staging_location') return [] elif staging_errors and not temp_errors: setattr(self, 'staging_location', getattr(self, 'temp_location')) + self._warn_if_soft_delete_policy_enabled('temp_location') return [] elif not staging_errors and not temp_errors: return [] @@ -935,6 +966,8 @@ def _handle_temp_and_staging_locations(self, validator): else: setattr(self, 'temp_location', default_bucket) setattr(self, 'staging_location', default_bucket) + self._warn_if_soft_delete_policy_enabled('temp_location') + self._warn_if_soft_delete_policy_enabled('staging_location') return [] def validate(self, validator): diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 61b227d9a246..017d498b0520 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -790,6 +790,25 @@ def test_validation_bad_stg_bad_temp_no_default(self): errors, errors) + def test_validation_temp_with_soft_delete(self): + from google.cloud.storage import Bucket + bucket = Bucket(None) + bucket.soft_delete_policy.retention_duration_seconds = 0 + with mock.patch("apache_beam.io.gcp.gcsio.GcsIO.get_bucket") as mock_get_bucket: + options = MockGoogleCloudOptionsWithBucket([ + '--project=myproject', + '--temp_location=gs://beam/tmp' + ]) + mock_get_bucket.return_value = bucket + + # soft delete policy enabled + bucket.soft_delete_policy.retention_duration_seconds = 1024 + self.assertTrue(options._warn_if_soft_delete_policy_enabled("temp_location")) + + # soft delete policy disabled + bucket.soft_delete_policy.retention_duration_seconds = 0 + self.assertFalse(options._warn_if_soft_delete_policy_enabled("temp_location")) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 9788a487df45abdb2caa9d3a7af5bbcc365a4195 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 7 Jun 2024 20:37:17 -0400 Subject: [PATCH 2/8] Apply formatter. --- .../apache_beam/options/pipeline_options.py | 46 ++++++++++--------- .../options/pipeline_options_test.py | 16 +++---- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index d45de9a08e5d..91ecc81a56ab 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -918,29 +918,31 @@ def _create_default_gcs_bucket(self): # The function returns true only if the policy is enabled. # If the policy is disabled or there is any exception def _warn_if_soft_delete_policy_enabled(self, arg_name): - gcs_path = getattr(self, arg_name, None) + gcs_path = getattr(self, arg_name, None) + try: + from google.api_core.exceptions import GoogleAPICallError + from apache_beam.io.gcp import gcsio try: - from google.api_core.exceptions import GoogleAPICallError - from apache_beam.io.gcp import gcsio - try: - bucket_name, _ = gcsio.parse_gcs_path(gcs_path) - bucket = gcsio.GcsIO().get_bucket(bucket_name) - if (bucket.soft_delete_policy is not None and - bucket.soft_delete_policy.retention_duration_seconds > 0): - _LOGGER.warning( - "Bucket %s specified in %s has soft-delete policy enabled." - " To avoid being billed for unnecessary storage costs, turn" - " off the soft delete feature on buckets that your Dataflow" - " jobs use for temporary and staging storage. For more" - " information, see" - " https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy." - % (bucket_name, arg_name)) - return True - except GoogleAPICallError: - _LOGGER.warning('Unable to check soft delete policy in the bucket of %s.' % gcs_path) - except ImportError: - _LOGGER.warning('Missing dependencies to check soft delete policy.') - return False + bucket_name, _ = gcsio.parse_gcs_path(gcs_path) + bucket = gcsio.GcsIO().get_bucket(bucket_name) + if (bucket.soft_delete_policy is not None and + bucket.soft_delete_policy.retention_duration_seconds > 0): + _LOGGER.warning( + "Bucket %s specified in %s has soft-delete policy enabled." + " To avoid being billed for unnecessary storage costs, turn" + " off the soft delete feature on buckets that your Dataflow" + " jobs use for temporary and staging storage. For more" + " information, see" + " https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy." + % (bucket_name, arg_name)) + return True + except GoogleAPICallError: + _LOGGER.warning( + 'Unable to check soft delete policy in the bucket of %s.' % + gcs_path) + except ImportError: + _LOGGER.warning('Missing dependencies to check soft delete policy.') + return False # If either temp or staging location has an issue, we use the valid one for # both locations. If both are bad we return an error. diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 017d498b0520..c292297374e5 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -793,21 +793,21 @@ def test_validation_bad_stg_bad_temp_no_default(self): def test_validation_temp_with_soft_delete(self): from google.cloud.storage import Bucket bucket = Bucket(None) - bucket.soft_delete_policy.retention_duration_seconds = 0 - with mock.patch("apache_beam.io.gcp.gcsio.GcsIO.get_bucket") as mock_get_bucket: - options = MockGoogleCloudOptionsWithBucket([ - '--project=myproject', - '--temp_location=gs://beam/tmp' - ]) + with mock.patch( + "apache_beam.io.gcp.gcsio.GcsIO.get_bucket") as mock_get_bucket: + options = MockGoogleCloudOptionsWithBucket( + ['--project=myproject', '--temp_location=gs://beam/tmp']) mock_get_bucket.return_value = bucket # soft delete policy enabled bucket.soft_delete_policy.retention_duration_seconds = 1024 - self.assertTrue(options._warn_if_soft_delete_policy_enabled("temp_location")) + self.assertTrue( + options._warn_if_soft_delete_policy_enabled("temp_location")) # soft delete policy disabled bucket.soft_delete_policy.retention_duration_seconds = 0 - self.assertFalse(options._warn_if_soft_delete_policy_enabled("temp_location")) + self.assertFalse( + options._warn_if_soft_delete_policy_enabled("temp_location")) if __name__ == '__main__': From 366ee11a5cec02d3c04672732dc553d9f9ce9a05 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 7 Jun 2024 20:45:42 -0400 Subject: [PATCH 3/8] Change ImportError to Exception to capture all kinds of exceptions. --- sdks/python/apache_beam/options/pipeline_options.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 91ecc81a56ab..48c59bd7a3ee 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -940,8 +940,9 @@ def _warn_if_soft_delete_policy_enabled(self, arg_name): _LOGGER.warning( 'Unable to check soft delete policy in the bucket of %s.' % gcs_path) - except ImportError: - _LOGGER.warning('Missing dependencies to check soft delete policy.') + except Exception: + _LOGGER.warning( + 'Unexpected error occurred when checking soft delete policy.') return False # If either temp or staging location has an issue, we use the valid one for From b658f7559a98642108eef42557c2835722b5d0dc Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 7 Jun 2024 20:54:51 -0400 Subject: [PATCH 4/8] Minor change on test name --- sdks/python/apache_beam/options/pipeline_options_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index c292297374e5..5570df62c843 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -790,7 +790,7 @@ def test_validation_bad_stg_bad_temp_no_default(self): errors, errors) - def test_validation_temp_with_soft_delete(self): + def test_soft_delete_on_temp_location(self): from google.cloud.storage import Bucket bucket = Bucket(None) with mock.patch( From c68709ca08fa71409b559001ded6ff5b645c341b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 7 Jun 2024 21:16:25 -0400 Subject: [PATCH 5/8] Fix lint --- sdks/python/apache_beam/options/pipeline_options.py | 4 ++-- sdks/python/apache_beam/options/pipeline_options_test.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 48c59bd7a3ee..f8f674e8b9cd 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -933,8 +933,8 @@ def _warn_if_soft_delete_policy_enabled(self, arg_name): " off the soft delete feature on buckets that your Dataflow" " jobs use for temporary and staging storage. For more" " information, see" - " https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy." - % (bucket_name, arg_name)) + " https://cloud.google.com/storage/docs/use-soft-delete" + "#remove-soft-delete-policy." % (bucket_name, arg_name)) return True except GoogleAPICallError: _LOGGER.warning( diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 5570df62c843..6d08143835e0 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -791,12 +791,11 @@ def test_validation_bad_stg_bad_temp_no_default(self): errors) def test_soft_delete_on_temp_location(self): - from google.cloud.storage import Bucket - bucket = Bucket(None) with mock.patch( "apache_beam.io.gcp.gcsio.GcsIO.get_bucket") as mock_get_bucket: options = MockGoogleCloudOptionsWithBucket( ['--project=myproject', '--temp_location=gs://beam/tmp']) + bucket = mock.MagicMock() mock_get_bucket.return_value = bucket # soft delete policy enabled From 6260d57ed0100166b135482e4fac325f74286da3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 10 Jun 2024 11:26:36 -0400 Subject: [PATCH 6/8] Move the soft delete code and test to gcsio. --- sdks/python/apache_beam/io/gcp/gcsio.py | 13 ++++++++++++ sdks/python/apache_beam/io/gcp/gcsio_test.py | 15 +++++++++++++ .../apache_beam/options/pipeline_options.py | 21 +++++-------------- .../options/pipeline_options_test.py | 18 ---------------- 4 files changed, 33 insertions(+), 34 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 1c05996020a2..83b3829cf5ef 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -35,6 +35,7 @@ from typing import Optional from typing import Union +from google.api_core.exceptions import GoogleAPICallError from google.cloud import storage from google.cloud.exceptions import NotFound from google.cloud.storage.fileio import BlobReader @@ -529,6 +530,18 @@ def _updated_to_seconds(updated): time.mktime(updated.timetuple()) - time.timezone + updated.microsecond / 1000000.0) + def is_soft_delete_enabled(self, gcs_path): + try: + bucket_name, _ = parse_gcs_path(gcs_path) + bucket = self.get_bucket(bucket_name) + if (bucket.soft_delete_policy is not None and + bucket.soft_delete_policy.retention_duration_seconds > 0): + return True + except Exception: + _LOGGER.warning( + "Unexpected error occurred when checking soft delete policy for %s" + % gcs_path) + return False class BeamBlobReader(BlobReader): def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE): diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index b17e0638d6b5..39d74a08e600 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -600,6 +600,21 @@ def test_create_default_bucket( self.assertEqual( request_data_json['softDeletePolicy']['retentionDurationSeconds'], 0) + @mock.patch("apache_beam.io.gcp.gcsio.GcsIO.get_bucket") + def test_is_soft_delete_enabled(self, mock_get_bucket): + bucket = mock.MagicMock() + mock_get_bucket.return_value = bucket + + # soft delete policy enabled + bucket.soft_delete_policy.retention_duration_seconds = 1024 + self.assertTrue(self.gcs.is_soft_delete_enabled( + "gs://beam_with_soft_delete/tmp")) + + # soft delete policy disabled + bucket.soft_delete_policy.retention_duration_seconds = 0 + self.assertFalse(self.gcs.is_soft_delete_enabled( + "gs://beam_without_soft_delete/tmp")) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index f8f674e8b9cd..e10f7b4f06a7 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -920,30 +920,19 @@ def _create_default_gcs_bucket(self): def _warn_if_soft_delete_policy_enabled(self, arg_name): gcs_path = getattr(self, arg_name, None) try: - from google.api_core.exceptions import GoogleAPICallError from apache_beam.io.gcp import gcsio - try: - bucket_name, _ = gcsio.parse_gcs_path(gcs_path) - bucket = gcsio.GcsIO().get_bucket(bucket_name) - if (bucket.soft_delete_policy is not None and - bucket.soft_delete_policy.retention_duration_seconds > 0): + if gcsio.GcsIO.is_soft_delete_enabled(gcs_path): _LOGGER.warning( - "Bucket %s specified in %s has soft-delete policy enabled." + "Bucket specified in %s has soft-delete policy enabled." " To avoid being billed for unnecessary storage costs, turn" " off the soft delete feature on buckets that your Dataflow" " jobs use for temporary and staging storage. For more" " information, see" " https://cloud.google.com/storage/docs/use-soft-delete" - "#remove-soft-delete-policy." % (bucket_name, arg_name)) - return True - except GoogleAPICallError: - _LOGGER.warning( - 'Unable to check soft delete policy in the bucket of %s.' % - gcs_path) - except Exception: + "#remove-soft-delete-policy." % arg_name) + except ImportError: _LOGGER.warning( - 'Unexpected error occurred when checking soft delete policy.') - return False + 'Unable to check soft delete policy due to import error.') # If either temp or staging location has an issue, we use the valid one for # both locations. If both are bad we return an error. diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 6d08143835e0..61b227d9a246 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -790,24 +790,6 @@ def test_validation_bad_stg_bad_temp_no_default(self): errors, errors) - def test_soft_delete_on_temp_location(self): - with mock.patch( - "apache_beam.io.gcp.gcsio.GcsIO.get_bucket") as mock_get_bucket: - options = MockGoogleCloudOptionsWithBucket( - ['--project=myproject', '--temp_location=gs://beam/tmp']) - bucket = mock.MagicMock() - mock_get_bucket.return_value = bucket - - # soft delete policy enabled - bucket.soft_delete_policy.retention_duration_seconds = 1024 - self.assertTrue( - options._warn_if_soft_delete_policy_enabled("temp_location")) - - # soft delete policy disabled - bucket.soft_delete_policy.retention_duration_seconds = 0 - self.assertFalse( - options._warn_if_soft_delete_policy_enabled("temp_location")) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 82dcb1312c1b6d3cb694008814d853169648eb79 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 10 Jun 2024 11:30:58 -0400 Subject: [PATCH 7/8] Apply formatter. --- sdks/python/apache_beam/io/gcp/gcsio.py | 7 ++++--- sdks/python/apache_beam/io/gcp/gcsio_test.py | 8 ++++---- .../apache_beam/options/pipeline_options.py | 19 +++++++++---------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 83b3829cf5ef..08a80a42ba0c 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -535,14 +535,15 @@ def is_soft_delete_enabled(self, gcs_path): bucket_name, _ = parse_gcs_path(gcs_path) bucket = self.get_bucket(bucket_name) if (bucket.soft_delete_policy is not None and - bucket.soft_delete_policy.retention_duration_seconds > 0): + bucket.soft_delete_policy.retention_duration_seconds > 0): return True except Exception: _LOGGER.warning( - "Unexpected error occurred when checking soft delete policy for %s" - % gcs_path) + "Unexpected error occurred when checking soft delete policy for %s" % + gcs_path) return False + class BeamBlobReader(BlobReader): def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE): super().__init__(blob, chunk_size=chunk_size) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 39d74a08e600..c1356b53095a 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -607,13 +607,13 @@ def test_is_soft_delete_enabled(self, mock_get_bucket): # soft delete policy enabled bucket.soft_delete_policy.retention_duration_seconds = 1024 - self.assertTrue(self.gcs.is_soft_delete_enabled( - "gs://beam_with_soft_delete/tmp")) + self.assertTrue( + self.gcs.is_soft_delete_enabled("gs://beam_with_soft_delete/tmp")) # soft delete policy disabled bucket.soft_delete_policy.retention_duration_seconds = 0 - self.assertFalse(self.gcs.is_soft_delete_enabled( - "gs://beam_without_soft_delete/tmp")) + self.assertFalse( + self.gcs.is_soft_delete_enabled("gs://beam_without_soft_delete/tmp")) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index e10f7b4f06a7..335d441daeda 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -922,17 +922,16 @@ def _warn_if_soft_delete_policy_enabled(self, arg_name): try: from apache_beam.io.gcp import gcsio if gcsio.GcsIO.is_soft_delete_enabled(gcs_path): - _LOGGER.warning( - "Bucket specified in %s has soft-delete policy enabled." - " To avoid being billed for unnecessary storage costs, turn" - " off the soft delete feature on buckets that your Dataflow" - " jobs use for temporary and staging storage. For more" - " information, see" - " https://cloud.google.com/storage/docs/use-soft-delete" - "#remove-soft-delete-policy." % arg_name) + _LOGGER.warning( + "Bucket specified in %s has soft-delete policy enabled." + " To avoid being billed for unnecessary storage costs, turn" + " off the soft delete feature on buckets that your Dataflow" + " jobs use for temporary and staging storage. For more" + " information, see" + " https://cloud.google.com/storage/docs/use-soft-delete" + "#remove-soft-delete-policy." % arg_name) except ImportError: - _LOGGER.warning( - 'Unable to check soft delete policy due to import error.') + _LOGGER.warning('Unable to check soft delete policy due to import error.') # If either temp or staging location has an issue, we use the valid one for # both locations. If both are bad we return an error. From 3e2623f0b583dfdd714c9424f4ac821504536851 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 10 Jun 2024 11:45:04 -0400 Subject: [PATCH 8/8] Minor fix. --- sdks/python/apache_beam/io/gcp/gcsio.py | 1 - sdks/python/apache_beam/options/pipeline_options.py | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 08a80a42ba0c..b2f8bd4a2da7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -35,7 +35,6 @@ from typing import Optional from typing import Union -from google.api_core.exceptions import GoogleAPICallError from google.cloud import storage from google.cloud.exceptions import NotFound from google.cloud.storage.fileio import BlobReader diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 335d441daeda..42aee47a957e 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -915,13 +915,11 @@ def _create_default_gcs_bucket(self): # Log warning if soft delete policy is enabled in a gcs bucket # that is specified in an argument. - # The function returns true only if the policy is enabled. - # If the policy is disabled or there is any exception def _warn_if_soft_delete_policy_enabled(self, arg_name): gcs_path = getattr(self, arg_name, None) try: from apache_beam.io.gcp import gcsio - if gcsio.GcsIO.is_soft_delete_enabled(gcs_path): + if gcsio.GcsIO().is_soft_delete_enabled(gcs_path): _LOGGER.warning( "Bucket specified in %s has soft-delete policy enabled." " To avoid being billed for unnecessary storage costs, turn"