From c28530bc428e5b9cecf05c1807a28dbe4e6cb0c6 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 30 Mar 2022 10:43:05 -0400 Subject: [PATCH 01/46] first commit --- sdks/python/apache_beam/internal/gcp/auth.py | 52 +++++++++++++------ .../apache_beam/options/pipeline_options.py | 7 +++ 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 439264a9794b..907a0b515428 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -40,6 +40,16 @@ _LOGGER = logging.getLogger(__name__) +CLIENT_SCOPES = [ + 'https://www.googleapis.com/auth/bigquery', + 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/devstorage.full_control', + 'https://www.googleapis.com/auth/userinfo.email', + 'https://www.googleapis.com/auth/datastore', + 'https://www.googleapis.com/auth/spanner.admin', + 'https://www.googleapis.com/auth/spanner.data' +] + def set_running_in_gce(worker_executing_project): """For internal use only; no backwards-compatibility guarantees. @@ -58,8 +68,11 @@ def set_running_in_gce(worker_executing_project): is_running_in_gce = True executing_project = worker_executing_project +def set_impersonation_accounts(target_principal = None, delegate_to = None): + _Credentials.set_impersonation_accounts(target_principal, delegate_to) + -def get_service_credentials(): +def get_service_credentials(target_principal = None, delegate_to = None): """For internal use only; no backwards-compatibility guarantees. Get credentials to access Google services. @@ -107,11 +120,11 @@ class _Credentials(object): _credentials_init = False _credentials = None + _delegate_to = None + _target_principal = None + @classmethod def get_service_credentials(cls): - if cls._credentials_init: - return cls._credentials - with cls._credentials_lock: if cls._credentials_init: return cls._credentials @@ -125,12 +138,13 @@ def get_service_credentials(cls): "socket default timeout is %s seconds.", socket.getdefaulttimeout()) cls._credentials = cls._get_service_credentials() + cls._add_impersonation_credentials() cls._credentials_init = True return cls._credentials @staticmethod - def _get_service_credentials(): + def _get_service_credentials(cls): if not _GOOGLE_AUTH_AVAILABLE: _LOGGER.warning( 'Unable to find default credentials because the google-auth library ' @@ -138,17 +152,8 @@ def _get_service_credentials(): 'Google default credentials. Connecting anonymously.') return None - client_scopes = [ - 'https://www.googleapis.com/auth/bigquery', - 'https://www.googleapis.com/auth/cloud-platform', - 'https://www.googleapis.com/auth/devstorage.full_control', - 'https://www.googleapis.com/auth/userinfo.email', - 'https://www.googleapis.com/auth/datastore', - 'https://www.googleapis.com/auth/spanner.admin', - 'https://www.googleapis.com/auth/spanner.data' - ] try: - credentials, _ = google.auth.default(scopes=client_scopes) # pylint: disable=c-extension-no-member + credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member credentials = _ApitoolsCredentialsAdapter(credentials) logging.debug( 'Connecting using Google Application Default ' @@ -160,3 +165,20 @@ def _get_service_credentials(): 'Connecting anonymously.', e) return None + + @classmethod + def set_impersonation_accounts(cls, target_principal, delegate_to): + cls._target_principal = target_principal + cls._delegate_to = delegate_to + + @classmethod + def _add_impersonation_credentials(cls): + """Adds impersonation credentials if the client species them.""" + if cls.target_principal: + credentials = google.auth.impersonated_credentials.Credentials( + source_credentials=cls._credentials, + target_principal=cls.target_principal, + delegates=cls._delegate_to, + target_scopes=CLIENT_SCOPES, + ) + return credentials \ No newline at end of file diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index d4ee6ad7f084..cc20828a69ab 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -751,6 +751,13 @@ def _add_argparse_args(cls, parser): help= 'When true, artifacts will be cached across job submissions in the GCS ' 'staging bucket') + parser.add_argument( + '--enable_artifact_caching', + default=None, + help= + 'When true, artifacts will be cached across job submissions in the GCS ' + 'staging bucket' + ) def _create_default_gcs_bucket(self): try: From bd76cf8522614e6ded137bae7735a6b795887bf5 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 1 Apr 2022 09:23:51 -0400 Subject: [PATCH 02/46] added parameters --- sdks/python/apache_beam/internal/gcp/auth.py | 10 +++++++--- .../apache_beam/options/pipeline_options.py | 18 +++++++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 907a0b515428..e5584b9f6447 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -120,8 +120,9 @@ class _Credentials(object): _credentials_init = False _credentials = None - _delegate_to = None + _delegate_accounts = None _target_principal = None + _impersonation_parameters_set = False @classmethod def get_service_credentials(cls): @@ -169,16 +170,19 @@ def _get_service_credentials(cls): @classmethod def set_impersonation_accounts(cls, target_principal, delegate_to): cls._target_principal = target_principal - cls._delegate_to = delegate_to + cls._delegate_accounts = delegate_to + cls._impersonation_parameters_set = True @classmethod def _add_impersonation_credentials(cls): + if not cls._impersonation_parameters_set: + raise Excpetion('Impersonation credentials set to late in workflow.') """Adds impersonation credentials if the client species them.""" if cls.target_principal: credentials = google.auth.impersonated_credentials.Credentials( source_credentials=cls._credentials, target_principal=cls.target_principal, - delegates=cls._delegate_to, + delegates=cls._delegate_accounts, target_scopes=CLIENT_SCOPES, ) return credentials \ No newline at end of file diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 91f206df0385..41fc557ba1bb 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -752,11 +752,23 @@ def _add_argparse_args(cls, parser): 'When true, artifacts will be cached across job submissions in the GCS ' 'staging bucket') parser.add_argument( - '--enable_artifact_caching', + '--target_principal', default=None, help= - 'When true, artifacts will be cached across job submissions in the GCS ' - 'staging bucket' + 'The service account to impersonate.' + ) + parser.add_argument( + '--delegate_accounts', + default=None, + help= + 'A comma separated list of delegates required to grant the final access_token. ' + 'If set, the sequence of identities must have "Service Account Token ' + 'Creator" capability granted to the preceding identity. For example, ' + 'if set to "serviceAccountB,serviceAccountC", the source_credential must ' + 'have the Token Creator role on serviceAccountB. serviceAccountB must have ' + 'the Token Creator on serviceAccountC. Finally, C must have Token Creator on ' + 'target_principal. If left unset, source_credential must have that role ' + 'on target_principal..' ) def _create_default_gcs_bucket(self): From 07a12a94848493ba2d39ba513accf16347907879 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 1 Apr 2022 11:59:34 -0400 Subject: [PATCH 03/46] add impersonation credentials to the runner --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index edb319dbfb0a..fc88b70e5239 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -534,9 +534,12 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): debug_options.add_experiment( 'min_cpu_platform=' + worker_options.min_cpu_platform) + google_cloud_options = options.view_as(GoogleCloudOptions) + beam.internal.gcp.auth.set_impersonation_accounts( + google_cloud_options.target_principal, + google_cloud_options.delegate_accounts) # Elevate "enable_streaming_engine" to pipeline option, but using the # existing experiment. - google_cloud_options = options.view_as(GoogleCloudOptions) if google_cloud_options.enable_streaming_engine: debug_options.add_experiment("enable_windmill_service") debug_options.add_experiment("enable_streaming_engine") From 9fc90c2e836419d4bcdf2b2f98afb9fd49115efe Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 1 Apr 2022 12:18:59 -0400 Subject: [PATCH 04/46] some small updates --- sdks/python/apache_beam/internal/gcp/auth.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index e5584b9f6447..58dbe8f2e12c 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -176,7 +176,7 @@ def set_impersonation_accounts(cls, target_principal, delegate_to): @classmethod def _add_impersonation_credentials(cls): if not cls._impersonation_parameters_set: - raise Excpetion('Impersonation credentials set to late in workflow.') + raise Exception('Impersonation credentials not yet set.') """Adds impersonation credentials if the client species them.""" if cls.target_principal: credentials = google.auth.impersonated_credentials.Credentials( @@ -185,4 +185,4 @@ def _add_impersonation_credentials(cls): delegates=cls._delegate_accounts, target_scopes=CLIENT_SCOPES, ) - return credentials \ No newline at end of file + return credentials From 08d798744c35e254fd5256124a0a827e0112dfef Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 1 Apr 2022 15:23:06 -0400 Subject: [PATCH 05/46] fixed options to set in file systems --- sdks/python/apache_beam/internal/gcp/auth.py | 18 +++++++++++------- .../python/apache_beam/io/gcp/gcsfilesystem.py | 15 ++++++++++++++- .../runners/worker/sdk_worker_main.py | 4 ++++ 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 58dbe8f2e12c..46c5527f4323 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -23,6 +23,8 @@ import socket import threading +import traceback # DO_NOT_SUBMIT + # google.auth is only available when Beam is installed with the gcp extra. try: import google.auth @@ -72,7 +74,7 @@ def set_impersonation_accounts(target_principal = None, delegate_to = None): _Credentials.set_impersonation_accounts(target_principal, delegate_to) -def get_service_credentials(target_principal = None, delegate_to = None): +def get_service_credentials(): """For internal use only; no backwards-compatibility guarantees. Get credentials to access Google services. @@ -139,13 +141,13 @@ def get_service_credentials(cls): "socket default timeout is %s seconds.", socket.getdefaulttimeout()) cls._credentials = cls._get_service_credentials() - cls._add_impersonation_credentials() + cls._credentials = cls._add_impersonation_credentials() cls._credentials_init = True return cls._credentials @staticmethod - def _get_service_credentials(cls): + def _get_service_credentials(): if not _GOOGLE_AUTH_AVAILABLE: _LOGGER.warning( 'Unable to find default credentials because the google-auth library ' @@ -176,12 +178,14 @@ def set_impersonation_accounts(cls, target_principal, delegate_to): @classmethod def _add_impersonation_credentials(cls): if not cls._impersonation_parameters_set: - raise Exception('Impersonation credentials not yet set.') + traceback_str = traceback.format_stack() + raise Exception('Impersonation credentials not yet set. \n' + str(traceback_str)) """Adds impersonation credentials if the client species them.""" - if cls.target_principal: + credentials = cls._credentials + if cls._target_principal: credentials = google.auth.impersonated_credentials.Credentials( - source_credentials=cls._credentials, - target_principal=cls.target_principal, + source_credentials=credentials, + target_principal=cls._target_principal, delegates=cls._delegate_accounts, target_scopes=CLIENT_SCOPES, ) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index e53ceef70c9f..21b189d3e88d 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -27,7 +27,9 @@ from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem from apache_beam.io.gcp import gcsio - +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.internal.gcp import auth __all__ = ['GCSFileSystem'] @@ -38,6 +40,17 @@ class GCSFileSystem(FileSystem): CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' + def __init__(self, pipeline_options): + super().__init__(pipeline_options) + if isinstance(pipeline_options, PipelineOptions): + gcs_options = pipeline_options.view_as(GoogleCloudOptions) + target_principal = gcs_options.target_principal + delegate_accounts = gcs_options.delegate_accounts + else: + target_principal = pipeline_options.get('target_principal') + delegate_accounts = pipeline_options.get('delegate_accounts') + auth.set_impersonation_accounts(target_principal, delegate_accounts) + @classmethod def scheme(cls): """URI scheme for the FileSystem diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 140087803817..94bf47ef8e38 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -29,6 +29,7 @@ from google.protobuf import text_format # type: ignore # not in typeshed +import apache_beam.internal.gcp.auth from apache_beam.internal import pickler from apache_beam.io import filesystems from apache_beam.options.pipeline_options import DebugOptions @@ -98,6 +99,9 @@ def create_harness(environment, dry_run=False): filesystems.FileSystems.set_options(sdk_pipeline_options) pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library pickler.set_library(pickle_library) + gcp_options = sdk_pipeline_options.view_as(GoogleCloudOptions) + apache_beam.internal.gcp.auth.set_impersonation_accounts(gcp_options.target_principal, + gcp_options.delegate_accounts) if 'SEMI_PERSISTENT_DIRECTORY' in environment: semi_persistent_directory = environment['SEMI_PERSISTENT_DIRECTORY'] From b8f5ab0e3afebedc11bd29bf0d9dcff7e4cd5bbf Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 4 Apr 2022 15:24:27 -0400 Subject: [PATCH 06/46] added impersonation to credentials --- .../apache_beam/runners/dataflow/internal/apiclient.py | 3 +++ sdks/python/apache_beam/runners/interactive/utils.py | 9 ++++++++- .../runners/portability/sdk_container_builder.py | 4 ++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 771d98d7763d..a138de78d343 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -50,6 +50,7 @@ from apache_beam import version as beam_version from apache_beam.internal.gcp.auth import get_service_credentials +from apache_beam.internal.gcp.auth import set_impersonation_accounts from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.internal.http_client import get_new_http from apache_beam.io.filesystems import FileSystems @@ -553,6 +554,8 @@ def __init__(self, options, root_staging_location=None): if self.google_cloud_options.no_auth: credentials = None else: + set_impersonation_accounts(self._google_cloud_options.target_principal, + self._google_cloud_options.delegate_accounts) credentials = get_service_credentials() http_client = get_new_http() diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index 48e51578a7a4..2ae7abc5ec07 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -474,7 +474,7 @@ def create_var_in_main(name: str, return name, value -def assert_bucket_exists(bucket_name): +def assert_bucket_exists(bucket_name, pipeline_options = None): # type: (str) -> None """Asserts whether the specified GCS bucket with the name @@ -486,6 +486,13 @@ def assert_bucket_exists(bucket_name): """ try: from apitools.base.py.exceptions import HttpError + if pipeline_options: + gcs_options = pipeline_options.view_as(GoogleCloudOptions) + target_principal = gcs_options.target_principal + delegate_accounts = gcs_options.delegate_accounts + auth.set_impersonation_accounts(target_principal, delegate_accounts) + else: + auth.set_impersonation_accounts(None, None) storage_client = storage.StorageV1( credentials=auth.get_service_credentials(), get_credentials=False, diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py index d06b005a4f0d..4552c9474e83 100644 --- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py +++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py @@ -40,6 +40,7 @@ from apache_beam import version as beam_version from apache_beam.internal.gcp.auth import get_service_credentials +from apache_beam.internal.gcp.auth import set_impersonation_accounts from apache_beam.internal.http_client import get_new_http from apache_beam.io.gcp.internal.clients import storage from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -206,9 +207,12 @@ def __init__(self, options): self._google_cloud_options = options.view_as(GoogleCloudOptions) self._cloud_build_machine_type = self._get_cloud_build_machine_type_enum( options.view_as(SetupOptions).cloud_build_machine_type) + if self._google_cloud_options.no_auth: credentials = None else: + set_impersonation_accounts(self._google_cloud_options.target_principal, + self._google_cloud_options.delegate_accounts) credentials = get_service_credentials() self._storage_client = storage.StorageV1( url='https://www.googleapis.com/storage/v1', From 99d6257d94d7e82676133171ce54a4bae455274a Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 4 Apr 2022 15:30:25 -0400 Subject: [PATCH 07/46] added impersonation accounts to bigquery --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 92defe139ab9..b562275c511f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -1365,6 +1365,13 @@ def __init__( source.pipeline_options.view_as(GoogleCloudOptions).project) else: self.executing_project = None + if hasattr(source, 'pipeline_options'): + gcs_options = source.pipeline_options.view_as(GoogleCloudOptions) + target_principal = gcs_options.target_principal + delegate_accounts = gcs_options.delegate_accounts + auth.set_impersonation_accounts(target_principal, delegate_accounts) + else: + auth.set_impersonation_accounts(None, None) # TODO(silviuc): Try to automatically get it from gcloud config info. if not self.executing_project and test_bigquery_client is None: @@ -1482,6 +1489,13 @@ def __init__(self, sink, test_bigquery_client=None, buffer_size=None): if self.project_id is None and hasattr(sink, 'pipeline_options'): self.project_id = ( sink.pipeline_options.view_as(GoogleCloudOptions).project) + if hasattr(sink, 'pipeline_options'): + gcs_options = sink.pipeline_options.view_as(GoogleCloudOptions) + target_principal = gcs_options.target_principal + delegate_accounts = gcs_options.delegate_accounts + auth.set_impersonation_accounts(target_principal, delegate_accounts) + else: + auth.set_impersonation_accounts(None, None) assert self.project_id is not None From ca5cad779c408bc4050db7350594b2b69ac4fafd Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 6 Apr 2022 10:09:35 -0400 Subject: [PATCH 08/46] dont use dataflow_runner is setting credntials --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index fc88b70e5239..edb319dbfb0a 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -534,12 +534,9 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): debug_options.add_experiment( 'min_cpu_platform=' + worker_options.min_cpu_platform) - google_cloud_options = options.view_as(GoogleCloudOptions) - beam.internal.gcp.auth.set_impersonation_accounts( - google_cloud_options.target_principal, - google_cloud_options.delegate_accounts) # Elevate "enable_streaming_engine" to pipeline option, but using the # existing experiment. + google_cloud_options = options.view_as(GoogleCloudOptions) if google_cloud_options.enable_streaming_engine: debug_options.add_experiment("enable_windmill_service") debug_options.add_experiment("enable_streaming_engine") From a3a138c44e46dd84bde9de8cf950a76d200bbd41 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 6 Apr 2022 10:52:19 -0400 Subject: [PATCH 09/46] lint --- sdks/python/apache_beam/internal/gcp/auth.py | 31 ++++++++++--------- .../apache_beam/options/pipeline_options.py | 31 +++++++++---------- .../runners/dataflow/internal/apiclient.py | 5 +-- .../apache_beam/runners/interactive/utils.py | 2 +- .../portability/sdk_container_builder.py | 5 +-- 5 files changed, 37 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 46c5527f4323..a114a482de2a 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -22,8 +22,7 @@ import logging import socket import threading - -import traceback # DO_NOT_SUBMIT +import traceback # google.auth is only available when Beam is installed with the gcp extra. try: @@ -43,13 +42,13 @@ _LOGGER = logging.getLogger(__name__) CLIENT_SCOPES = [ - 'https://www.googleapis.com/auth/bigquery', - 'https://www.googleapis.com/auth/cloud-platform', - 'https://www.googleapis.com/auth/devstorage.full_control', - 'https://www.googleapis.com/auth/userinfo.email', - 'https://www.googleapis.com/auth/datastore', - 'https://www.googleapis.com/auth/spanner.admin', - 'https://www.googleapis.com/auth/spanner.data' + 'https://www.googleapis.com/auth/bigquery', + 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/devstorage.full_control', + 'https://www.googleapis.com/auth/userinfo.email', + 'https://www.googleapis.com/auth/datastore', + 'https://www.googleapis.com/auth/spanner.admin', + 'https://www.googleapis.com/auth/spanner.data' ] @@ -70,7 +69,8 @@ def set_running_in_gce(worker_executing_project): is_running_in_gce = True executing_project = worker_executing_project -def set_impersonation_accounts(target_principal = None, delegate_to = None): + +def set_impersonation_accounts(target_principal=None, delegate_to=None): _Credentials.set_impersonation_accounts(target_principal, delegate_to) @@ -179,14 +179,15 @@ def set_impersonation_accounts(cls, target_principal, delegate_to): def _add_impersonation_credentials(cls): if not cls._impersonation_parameters_set: traceback_str = traceback.format_stack() - raise Exception('Impersonation credentials not yet set. \n' + str(traceback_str)) + raise Exception( + 'Impersonation credentials not yet set. \n' + str(traceback_str)) """Adds impersonation credentials if the client species them.""" credentials = cls._credentials if cls._target_principal: credentials = google.auth.impersonated_credentials.Credentials( - source_credentials=credentials, - target_principal=cls._target_principal, - delegates=cls._delegate_accounts, - target_scopes=CLIENT_SCOPES, + source_credentials=credentials, + target_principal=cls._target_principal, + delegates=cls._delegate_accounts, + target_scopes=CLIENT_SCOPES, ) return credentials diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index feb59f7ce95e..b38972fedd30 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -752,24 +752,21 @@ def _add_argparse_args(cls, parser): 'When true, artifacts will be cached across job submissions in the GCS ' 'staging bucket') parser.add_argument( - '--target_principal', - default=None, - help= - 'The service account to impersonate.' - ) + '--target_principal', + default=None, + help='The service account to impersonate.') parser.add_argument( - '--delegate_accounts', - default=None, - help= - 'A comma separated list of delegates required to grant the final access_token. ' - 'If set, the sequence of identities must have "Service Account Token ' - 'Creator" capability granted to the preceding identity. For example, ' - 'if set to "serviceAccountB,serviceAccountC", the source_credential must ' - 'have the Token Creator role on serviceAccountB. serviceAccountB must have ' - 'the Token Creator on serviceAccountC. Finally, C must have Token Creator on ' - 'target_principal. If left unset, source_credential must have that role ' - 'on target_principal..' - ) + '--delegate_accounts', + default=None, + help= + 'A comma separated list of delegates required to grant the final access_token. ' + 'If set, the sequence of identities must have "Service Account Token ' + 'Creator" capability granted to the preceding identity. For example, ' + 'if set to "serviceAccountB,serviceAccountC", the source_credential must ' + 'have the Token Creator role on serviceAccountB. serviceAccountB must have ' + 'the Token Creator on serviceAccountC. Finally, C must have Token Creator on ' + 'target_principal. If left unset, source_credential must have that role ' + 'on target_principal..') def _create_default_gcs_bucket(self): try: diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index a138de78d343..49adc36601a2 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -554,8 +554,9 @@ def __init__(self, options, root_staging_location=None): if self.google_cloud_options.no_auth: credentials = None else: - set_impersonation_accounts(self._google_cloud_options.target_principal, - self._google_cloud_options.delegate_accounts) + set_impersonation_accounts( + self._google_cloud_options.target_principal, + self._google_cloud_options.delegate_accounts) credentials = get_service_credentials() http_client = get_new_http() diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index 2ae7abc5ec07..713bdbb2da16 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -474,7 +474,7 @@ def create_var_in_main(name: str, return name, value -def assert_bucket_exists(bucket_name, pipeline_options = None): +def assert_bucket_exists(bucket_name, pipeline_options=None): # type: (str) -> None """Asserts whether the specified GCS bucket with the name diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py index 4552c9474e83..983fad4c7033 100644 --- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py +++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py @@ -211,8 +211,9 @@ def __init__(self, options): if self._google_cloud_options.no_auth: credentials = None else: - set_impersonation_accounts(self._google_cloud_options.target_principal, - self._google_cloud_options.delegate_accounts) + set_impersonation_accounts( + self._google_cloud_options.target_principal, + self._google_cloud_options.delegate_accounts) credentials = get_service_credentials() self._storage_client = storage.StorageV1( url='https://www.googleapis.com/storage/v1', From 5b26b779a04aebbfd6c1351fe6198fc2c6bffa0c Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 6 Apr 2022 11:12:38 -0400 Subject: [PATCH 10/46] change line lenght of pipeline options --- .../apache_beam/options/pipeline_options.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index b38972fedd30..a3be06e38420 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -749,8 +749,8 @@ def _add_argparse_args(cls, parser): default=False, action='store_true', help= - 'When true, artifacts will be cached across job submissions in the GCS ' - 'staging bucket') + 'When true, artifacts will be cached across job submissions in the ' + 'GCS staging bucket') parser.add_argument( '--target_principal', default=None, @@ -759,14 +759,15 @@ def _add_argparse_args(cls, parser): '--delegate_accounts', default=None, help= - 'A comma separated list of delegates required to grant the final access_token. ' - 'If set, the sequence of identities must have "Service Account Token ' - 'Creator" capability granted to the preceding identity. For example, ' - 'if set to "serviceAccountB,serviceAccountC", the source_credential must ' - 'have the Token Creator role on serviceAccountB. serviceAccountB must have ' - 'the Token Creator on serviceAccountC. Finally, C must have Token Creator on ' - 'target_principal. If left unset, source_credential must have that role ' - 'on target_principal..') + 'A comma separated list of delegates required to grant the final ' + 'access_token. If set, the sequence of identities must have "Service ' + 'Account Token Creator" capability granted to the preceding identity. ' + 'For example, if set to "serviceAccountB,serviceAccountC", the ' + 'source_credential must have the Token Creator role on ' + 'serviceAccountB. serviceAccountB must have the Token Creator on ' + 'serviceAccountC. Finally, C must have Token Creator on ' + 'target_principal. If left unset, source_credential must have that ' + 'role on target_principal..') def _create_default_gcs_bucket(self): try: From 92ba9b660447b53caf21c8c685e7b42bf7f60f08 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 6 Apr 2022 15:38:10 -0400 Subject: [PATCH 11/46] use similar delegate_to name throughout --- sdks/python/apache_beam/internal/gcp/auth.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index a114a482de2a..5f3da301f8a7 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -172,7 +172,7 @@ def _get_service_credentials(): @classmethod def set_impersonation_accounts(cls, target_principal, delegate_to): cls._target_principal = target_principal - cls._delegate_accounts = delegate_to + cls._delegate_to = delegate_to cls._impersonation_parameters_set = True @classmethod @@ -187,7 +187,7 @@ def _add_impersonation_credentials(cls): credentials = google.auth.impersonated_credentials.Credentials( source_credentials=credentials, target_principal=cls._target_principal, - delegates=cls._delegate_accounts, + delegates=cls._delegate_to, target_scopes=CLIENT_SCOPES, ) return credentials From 4b3d39b58ae9724b5da24acda1b27f1be8b39f25 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 12 Apr 2022 10:20:21 -0400 Subject: [PATCH 12/46] fixed name errors --- sdks/python/apache_beam/internal/gcp/auth.py | 5 +++-- .../apache_beam/runners/dataflow/internal/apiclient.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 5f3da301f8a7..8d205753f25a 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -26,6 +26,7 @@ # google.auth is only available when Beam is installed with the gcp extra. try: + from google.auth import impersonated_credentials import google.auth import google_auth_httplib2 _GOOGLE_AUTH_AVAILABLE = True @@ -141,7 +142,6 @@ def get_service_credentials(cls): "socket default timeout is %s seconds.", socket.getdefaulttimeout()) cls._credentials = cls._get_service_credentials() - cls._credentials = cls._add_impersonation_credentials() cls._credentials_init = True return cls._credentials @@ -157,6 +157,7 @@ def _get_service_credentials(): try: credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member + credentials = _Credentials._add_impersonation_credentials() credentials = _ApitoolsCredentialsAdapter(credentials) logging.debug( 'Connecting using Google Application Default ' @@ -184,7 +185,7 @@ def _add_impersonation_credentials(cls): """Adds impersonation credentials if the client species them.""" credentials = cls._credentials if cls._target_principal: - credentials = google.auth.impersonated_credentials.Credentials( + credentials = impersonated_credentials.Credentials( source_credentials=credentials, target_principal=cls._target_principal, delegates=cls._delegate_to, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index e9da865f1941..469ce85cad6c 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -553,8 +553,8 @@ def __init__(self, options, root_staging_location=None): credentials = None else: set_impersonation_accounts( - self._google_cloud_options.target_principal, - self._google_cloud_options.delegate_accounts) + self.google_cloud_options.target_principal, + self.google_cloud_options.delegate_accounts) credentials = get_service_credentials() http_client = get_new_http() From d5e283bf60acf066d741c2cde425ba105242c70d Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 12 Apr 2022 11:45:19 -0400 Subject: [PATCH 13/46] added a credentials fix --- sdks/python/apache_beam/internal/gcp/auth.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 8d205753f25a..01b057f01dde 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -157,7 +157,7 @@ def _get_service_credentials(): try: credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member - credentials = _Credentials._add_impersonation_credentials() + credentials = _Credentials._add_impersonation_credentials(credentials) credentials = _ApitoolsCredentialsAdapter(credentials) logging.debug( 'Connecting using Google Application Default ' @@ -177,13 +177,12 @@ def set_impersonation_accounts(cls, target_principal, delegate_to): cls._impersonation_parameters_set = True @classmethod - def _add_impersonation_credentials(cls): + def _add_impersonation_credentials(cls, credentials): if not cls._impersonation_parameters_set: traceback_str = traceback.format_stack() raise Exception( 'Impersonation credentials not yet set. \n' + str(traceback_str)) """Adds impersonation credentials if the client species them.""" - credentials = cls._credentials if cls._target_principal: credentials = impersonated_credentials.Credentials( source_credentials=credentials, From e03adc314229cb44b7267c929988368ed43a57e6 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 12 Apr 2022 13:57:22 -0400 Subject: [PATCH 14/46] moved impersonation into a single parameter --- sdks/python/apache_beam/internal/gcp/auth.py | 24 ++++++++++--------- .../apache_beam/io/gcp/bigquery_tools.py | 7 +++--- .../apache_beam/io/gcp/gcsfilesystem.py | 8 +++---- .../apache_beam/options/pipeline_options.py | 22 ++++++----------- .../runners/dataflow/internal/apiclient.py | 3 +-- .../apache_beam/runners/interactive/utils.py | 7 +++--- .../portability/sdk_container_builder.py | 3 +-- 7 files changed, 31 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 01b057f01dde..f39689ed9602 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -71,8 +71,8 @@ def set_running_in_gce(worker_executing_project): executing_project = worker_executing_project -def set_impersonation_accounts(target_principal=None, delegate_to=None): - _Credentials.set_impersonation_accounts(target_principal, delegate_to) +def set_impersonation_accounts(impersonate_service_account=None): + _Credentials.set_impersonation_accounts(impersonate_service_account) def get_service_credentials(): @@ -125,7 +125,7 @@ class _Credentials(object): _delegate_accounts = None _target_principal = None - _impersonation_parameters_set = False + _impersonation_parameter_set = False @classmethod def get_service_credentials(cls): @@ -171,23 +171,25 @@ def _get_service_credentials(): return None @classmethod - def set_impersonation_accounts(cls, target_principal, delegate_to): - cls._target_principal = target_principal - cls._delegate_to = delegate_to - cls._impersonation_parameters_set = True + def set_impersonation_accounts(cls, impersonate_service_account): + cls._impersonate_service_account = impersonate_service_account + cls._impersonation_parameter_set = True @classmethod def _add_impersonation_credentials(cls, credentials): - if not cls._impersonation_parameters_set: + if not cls._impersonation_parameter_set: traceback_str = traceback.format_stack() raise Exception( 'Impersonation credentials not yet set. \n' + str(traceback_str)) """Adds impersonation credentials if the client species them.""" - if cls._target_principal: + if cls._impersonate_service_account: + impersonate_accounts = cls._impersonate_service_account.split(',') + target_principal = impersonate_accounts[-1] + delegate_to = impersonate_accounts[0,:] credentials = impersonated_credentials.Credentials( source_credentials=credentials, - target_principal=cls._target_principal, - delegates=cls._delegate_to, + target_principal=target_principal, + delegates=delegate_to, target_scopes=CLIENT_SCOPES, ) return credentials diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index b562275c511f..2a12f90b009e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -1491,11 +1491,10 @@ def __init__(self, sink, test_bigquery_client=None, buffer_size=None): sink.pipeline_options.view_as(GoogleCloudOptions).project) if hasattr(sink, 'pipeline_options'): gcs_options = sink.pipeline_options.view_as(GoogleCloudOptions) - target_principal = gcs_options.target_principal - delegate_accounts = gcs_options.delegate_accounts - auth.set_impersonation_accounts(target_principal, delegate_accounts) + impersonate_service_account = gcs_options.impersonate_service_account + auth.set_impersonation_accounts(impersonate_service_account) else: - auth.set_impersonation_accounts(None, None) + auth.set_impersonation_accounts(None) assert self.project_id is not None diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 21b189d3e88d..46445749a49f 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -44,12 +44,10 @@ def __init__(self, pipeline_options): super().__init__(pipeline_options) if isinstance(pipeline_options, PipelineOptions): gcs_options = pipeline_options.view_as(GoogleCloudOptions) - target_principal = gcs_options.target_principal - delegate_accounts = gcs_options.delegate_accounts + impersonate_service_account = gcs_options.impersonate_service_account else: - target_principal = pipeline_options.get('target_principal') - delegate_accounts = pipeline_options.get('delegate_accounts') - auth.set_impersonation_accounts(target_principal, delegate_accounts) + impersonate_service_account = pipeline_options.get('impersonate_service_account') + auth.set_impersonation_accounts(impersonate_service_account) @classmethod def scheme(cls): diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index a3be06e38420..a200843b200b 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -752,22 +752,14 @@ def _add_argparse_args(cls, parser): 'When true, artifacts will be cached across job submissions in the ' 'GCS staging bucket') parser.add_argument( - '--target_principal', + '--impersonate_service_account', default=None, - help='The service account to impersonate.') - parser.add_argument( - '--delegate_accounts', - default=None, - help= - 'A comma separated list of delegates required to grant the final ' - 'access_token. If set, the sequence of identities must have "Service ' - 'Account Token Creator" capability granted to the preceding identity. ' - 'For example, if set to "serviceAccountB,serviceAccountC", the ' - 'source_credential must have the Token Creator role on ' - 'serviceAccountB. serviceAccountB must have the Token Creator on ' - 'serviceAccountC. Finally, C must have Token Creator on ' - 'target_principal. If left unset, source_credential must have that ' - 'role on target_principal..') + help='All API requests will be made as the given service account or ' + 'target service account in an impersonation delegation chain ' + 'instead of the currently selected account. You can specify ' + 'either a single service account as the impersonator, or a ' + 'comma-separated list of service accounts to create an ' + 'impersonation delegation chain.') def _create_default_gcs_bucket(self): try: diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 469ce85cad6c..6c0a068dcded 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -553,8 +553,7 @@ def __init__(self, options, root_staging_location=None): credentials = None else: set_impersonation_accounts( - self.google_cloud_options.target_principal, - self.google_cloud_options.delegate_accounts) + self.google_cloud_options.impersonate_service_account) credentials = get_service_credentials() http_client = get_new_http() diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index 8920a65f7120..592be2014829 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -488,11 +488,10 @@ def assert_bucket_exists(bucket_name, pipeline_options=None): from apitools.base.py.exceptions import HttpError if pipeline_options: gcs_options = pipeline_options.view_as(GoogleCloudOptions) - target_principal = gcs_options.target_principal - delegate_accounts = gcs_options.delegate_accounts - auth.set_impersonation_accounts(target_principal, delegate_accounts) + impersonate_service_account = gcs_options.impersonate_service_account + auth.set_impersonation_accounts(impersonate_service_account) else: - auth.set_impersonation_accounts(None, None) + auth.set_impersonation_accounts(None) storage_client = storage.StorageV1( credentials=auth.get_service_credentials(), get_credentials=False, diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py index 983fad4c7033..a298ed0a5e75 100644 --- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py +++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py @@ -212,8 +212,7 @@ def __init__(self, options): credentials = None else: set_impersonation_accounts( - self._google_cloud_options.target_principal, - self._google_cloud_options.delegate_accounts) + self._google_cloud_options.impersonate_service_account) credentials = get_service_credentials() self._storage_client = storage.StorageV1( url='https://www.googleapis.com/storage/v1', From 4a1d6b0db77a05c33ca854729842212dfb0d3a78 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 21 Apr 2022 18:02:50 -0400 Subject: [PATCH 15/46] fixed bug in delegate_to, added logging --- sdks/python/apache_beam/internal/gcp/auth.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index f39689ed9602..f7f896c73e43 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -183,9 +183,13 @@ def _add_impersonation_credentials(cls, credentials): 'Impersonation credentials not yet set. \n' + str(traceback_str)) """Adds impersonation credentials if the client species them.""" if cls._impersonate_service_account: + # TODO (remove log) + _LOGGER.warning('cls._impersonate_service_account:' + str(cls._impersonate_service_account)) impersonate_accounts = cls._impersonate_service_account.split(',') target_principal = impersonate_accounts[-1] - delegate_to = impersonate_accounts[0,:] + delegate_to = impersonate_accounts[0:-1] + # TODO (remove log) + _LOGGER.warning('Setting Impersonation Credentials Target PRINCIPAL:' + str(target_principal) + ' -- Delegate To:' + str(delegate_to)) credentials = impersonated_credentials.Credentials( source_credentials=credentials, target_principal=target_principal, From a18b3d30b96cdca46cf29ac0a809c13d3d2b4654 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 6 May 2022 11:48:50 -0400 Subject: [PATCH 16/46] added auth --- sdks/python/apache_beam/internal/gcp/auth.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index f7f896c73e43..79587a553aa9 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -183,13 +183,10 @@ def _add_impersonation_credentials(cls, credentials): 'Impersonation credentials not yet set. \n' + str(traceback_str)) """Adds impersonation credentials if the client species them.""" if cls._impersonate_service_account: - # TODO (remove log) - _LOGGER.warning('cls._impersonate_service_account:' + str(cls._impersonate_service_account)) + _LOGGER.info('Impersonating ' + str(cls._impersonate_service_account)) impersonate_accounts = cls._impersonate_service_account.split(',') target_principal = impersonate_accounts[-1] delegate_to = impersonate_accounts[0:-1] - # TODO (remove log) - _LOGGER.warning('Setting Impersonation Credentials Target PRINCIPAL:' + str(target_principal) + ' -- Delegate To:' + str(delegate_to)) credentials = impersonated_credentials.Credentials( source_credentials=credentials, target_principal=target_principal, From f92857ac6c7a50d1f7308dd87f438cdbecc68764 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 9 May 2022 08:14:30 -0400 Subject: [PATCH 17/46] added test --- sdks/python/scripts/run_integration_test.sh | 11 +++++++++ sdks/python/test-suites/dataflow/build.gradle | 6 +++++ .../python/test-suites/dataflow/common.gradle | 23 +++++++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh index b477e918d96c..ea8518b24818 100755 --- a/sdks/python/scripts/run_integration_test.sh +++ b/sdks/python/scripts/run_integration_test.sh @@ -79,6 +79,7 @@ WORKER_JAR="" KMS_KEY_NAME="projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test" SUITE="" COLLECT_MARKERS= +IMPERSONATE_ACCOUNT=allows-impersonation@apache-beam-testing.iam.gserviceaccount.com # Default test (pytest) options. # Run WordCountIT.test_wordcount_it by default if no test options are @@ -154,6 +155,11 @@ case $key in shift # past argument shift # past value ;; + --impersonate) + IMPERSONATE="$2" + shift # past argument + shift # past value + ;; --test_opts) TEST_OPTS="$2" shift # past argument @@ -256,6 +262,11 @@ if [[ -z $PIPELINE_OPTS ]]; then fi + # add impersonate_service_account if provided + if [[ "$IMPERSONATE" = true ]]; then + opts+=("--impersonate_service_account=\"$IMPERSONATE_ACCOUNT\"") + fi + if [[ ! -z "$KMS_KEY_NAME" ]]; then opts+=( "--kms_key_name=$KMS_KEY_NAME" diff --git a/sdks/python/test-suites/dataflow/build.gradle b/sdks/python/test-suites/dataflow/build.gradle index 036159eede3f..c411fc9720e3 100644 --- a/sdks/python/test-suites/dataflow/build.gradle +++ b/sdks/python/test-suites/dataflow/build.gradle @@ -49,6 +49,12 @@ task chicagoTaxiExample { } } +task impersonation { + getVersionsAsList('dataflow_impersonation_versions').each { + dependsOn.add("sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:impersonation") + } +} + task validatesRunnerBatchTests { getVersionsAsList('dataflow_validates_runner_batch_tests').each { dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:validatesRunnerBatchTests") diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 4a66bef46989..68e381d0df88 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -344,3 +344,26 @@ task validatesContainer() { } } } + +task runImpersonationTest() { + dependsOn 'installGcpTest' + dependsOn ':sdks:python:sdist' + def testOpts = basicPytestOpts + + doFirst { + def argMap = [ + "test_opts": testOpts + ["--numprocesses=8", "--dist=loadfile"], + "sdk_location": files(configurations.distTarBall.files).singleFile, + "runner_v2": "true", + "suite": "postCommitIT-df${pythonVersionSuffix}-xdist", + "collect": "examples_postcommit and not no_xdist and not sickbay_dataflow" + "impersonate": "true" + "gcs_location": "gs://impersonation-test" + ] + def cmdArgs = mapToArgString(argMap) + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs" + } + } +} From e474ff8c51e852ba33e8f0358bb29703ea5adff5 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 9 May 2022 08:17:23 -0400 Subject: [PATCH 18/46] changed impersonation location --- sdks/python/test-suites/dataflow/common.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 68e381d0df88..cec8556dd729 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -358,7 +358,7 @@ task runImpersonationTest() { "suite": "postCommitIT-df${pythonVersionSuffix}-xdist", "collect": "examples_postcommit and not no_xdist and not sickbay_dataflow" "impersonate": "true" - "gcs_location": "gs://impersonation-test" + "gcs_location": "gs://tmp-impersonate-test" ] def cmdArgs = mapToArgString(argMap) exec { From 1f4c3e65ed5fc4c8052f90566211a17591d478c8 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 9 May 2022 21:45:47 -0400 Subject: [PATCH 19/46] updated tests --- sdks/python/scripts/run_integration_test.sh | 11 +++++++++-- sdks/python/test-suites/dataflow/common.gradle | 12 ++++++++---- sdks/python/test-suites/gradle.properties | 1 + 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh index ea8518b24818..c61691c09dac 100755 --- a/sdks/python/scripts/run_integration_test.sh +++ b/sdks/python/scripts/run_integration_test.sh @@ -43,6 +43,7 @@ # flag is specified, all above flag will be ignored. # Please include all required pipeline options when # using this flag. +# impersonate -> True if the test uses impersonation. # # Test related flags: # test_opts -> List of space separated options to configure Pytest test @@ -80,6 +81,8 @@ KMS_KEY_NAME="projects/apache-beam-testing/locations/global/keyRings/beam-it/cry SUITE="" COLLECT_MARKERS= IMPERSONATE_ACCOUNT=allows-impersonation@apache-beam-testing.iam.gserviceaccount.com +IMPERSONATE_RUNNER_ACCOUNT=impersonation-dataflow-worker@apache-beam-testing.iam.gserviceaccount.com +IMPERSONATE_LOCATION=gs://impersonation-test-bucket # Default test (pytest) options. # Run WordCountIT.test_wordcount_it by default if no test options are @@ -230,8 +233,6 @@ if [[ -z $PIPELINE_OPTS ]]; then "--project=$PROJECT" "--region=$REGION" "--staging_location=$GCS_LOCATION/staging-it" - "--temp_location=$GCS_LOCATION/temp-it" - "--output=$GCS_LOCATION/py-it-cloud/output" "--sdk_location=$SDK_LOCATION" "--requirements_file=postcommit_requirements.txt" "--num_workers=$NUM_WORKERS" @@ -265,6 +266,12 @@ if [[ -z $PIPELINE_OPTS ]]; then # add impersonate_service_account if provided if [[ "$IMPERSONATE" = true ]]; then opts+=("--impersonate_service_account=\"$IMPERSONATE_ACCOUNT\"") + opts+=("--service_account_email=\"$IMPERSONATE_RUNNER_ACCOUNT\"") + opts+=("--temp_location=$IMPERSONATE_LOCATION/temp-it") + opts+=("--output=$IMPERSONATE_LOCATION/py-it-cloud/output") + else + opts+=("--temp_location=$GCS_LOCATION/temp-it") + opts+=("--output=$GCS_LOCATION/py-it-cloud/output") fi if [[ ! -z "$KMS_KEY_NAME" ]]; then diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index b3acf5e4c8ed..3f7d23a47f07 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -368,17 +368,21 @@ task validatesContainer() { task runImpersonationTest() { dependsOn 'installGcpTest' dependsOn ':sdks:python:sdist' - def testOpts = basicPytestOpts + def testOpts = [ + "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it", + "--capture=no", // Print stdout instantly + "--numprocesses=2", // Number of tests running in parallel + "--timeout=1800", // Timeout of whole command execution + ] doFirst { def argMap = [ - "test_opts": testOpts + ["--numprocesses=8", "--dist=loadfile"], + "test_opts": testOpts, "sdk_location": files(configurations.distTarBall.files).singleFile, "runner_v2": "true", "suite": "postCommitIT-df${pythonVersionSuffix}-xdist", - "collect": "examples_postcommit and not no_xdist and not sickbay_dataflow" + "collect": "impersonation_wordcount_it_test", "impersonate": "true" - "gcs_location": "gs://tmp-impersonate-test" ] def cmdArgs = mapToArgString(argMap) exec { diff --git a/sdks/python/test-suites/gradle.properties b/sdks/python/test-suites/gradle.properties index cf167b22abed..4cf9cc481600 100644 --- a/sdks/python/test-suites/gradle.properties +++ b/sdks/python/test-suites/gradle.properties @@ -31,6 +31,7 @@ dataflow_validates_container_tests=3.7,3.8,3.9 dataflow_validates_runner_batch_tests_V2=3.9 dataflow_validates_runner_streaming_tests_V2=3.9 dataflow_examples_postcommit_py_versions=3.9 +dataflow_impersonation_versions=3.8 # direct runner test-suites direct_mongodbio_it_task_py_versions=3.9 From 6c869b080bfd3a84362f6e1a51fc5a76b2749e96 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 10 May 2022 06:26:59 -0400 Subject: [PATCH 20/46] Don't pass impersonate_service_account to workers --- sdks/python/apache_beam/internal/gcp/auth.py | 1 + sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 6 +++++- .../apache_beam/runners/dataflow/internal/apiclient.py | 4 ++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 79587a553aa9..141bdeff65ba 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -126,6 +126,7 @@ class _Credentials(object): _delegate_accounts = None _target_principal = None _impersonation_parameter_set = False + _impersonate_service_account = None @classmethod def get_service_credentials(cls): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 13cbec6dc022..c45361324eba 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -41,6 +41,7 @@ from apache_beam import error from apache_beam.internal import pickler from apache_beam.internal.gcp import json_value +from apache_beam.internal.gcp.auth import set_impersonation_accounts from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import SetupOptions @@ -395,6 +396,10 @@ def _check_for_unsupported_features_on_non_portable_worker(self, pipeline): def run_pipeline(self, pipeline, options, pipeline_proto=None): """Remotely executes entire pipeline or parts reachable from node.""" + google_cloud_options = options.view_as(GoogleCloudOptions) + set_impersonation_accounts( + google_cloud_options.impersonate_service_account) + # Label goog-dataflow-notebook if job is started from notebook. if is_in_notebook(): notebook_version = ( @@ -546,7 +551,6 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): # Elevate "enable_streaming_engine" to pipeline option, but using the # existing experiment. - google_cloud_options = options.view_as(GoogleCloudOptions) if google_cloud_options.enable_streaming_engine: debug_options.add_experiment("enable_windmill_service") debug_options.add_experiment("enable_streaming_engine") diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index e2687aedad9f..f5685d577993 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -346,6 +346,10 @@ def __init__( for k, v in sdk_pipeline_options.items() if v is not None } options_dict["pipelineUrl"] = proto_pipeline_staged_url + # Don't pass impersonate_service_account through to the harness. + # Though impersonation should start a job, the workers should + # not try to modify their credentials. + options_dict['impersonate_service_account'] = None self.proto.sdkPipelineOptions.additionalProperties.append( dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( key='options', value=to_json_value(options_dict))) From 15bfda9bc9177f3bfc5e8818d581c6aa1cdeef74 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 10 May 2022 06:28:04 -0400 Subject: [PATCH 21/46] yapf --- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 3 ++- sdks/python/apache_beam/options/pipeline_options.py | 13 ++++++------- .../apache_beam/runners/dataflow/dataflow_runner.py | 3 +-- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index e7b3d9d5c6a0..1590dd34bb71 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -53,7 +53,8 @@ def __init__(self, pipeline_options): gcs_options = pipeline_options.view_as(GoogleCloudOptions) impersonate_service_account = gcs_options.impersonate_service_account else: - impersonate_service_account = pipeline_options.get('impersonate_service_account') + impersonate_service_account = pipeline_options.get( + 'impersonate_service_account') auth.set_impersonation_accounts(impersonate_service_account) @classmethod diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index a200843b200b..f89d11619103 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -748,18 +748,17 @@ def _add_argparse_args(cls, parser): '--enable_artifact_caching', default=False, action='store_true', - help= - 'When true, artifacts will be cached across job submissions in the ' + help='When true, artifacts will be cached across job submissions in the ' 'GCS staging bucket') parser.add_argument( '--impersonate_service_account', default=None, help='All API requests will be made as the given service account or ' - 'target service account in an impersonation delegation chain ' - 'instead of the currently selected account. You can specify ' - 'either a single service account as the impersonator, or a ' - 'comma-separated list of service accounts to create an ' - 'impersonation delegation chain.') + 'target service account in an impersonation delegation chain ' + 'instead of the currently selected account. You can specify ' + 'either a single service account as the impersonator, or a ' + 'comma-separated list of service accounts to create an ' + 'impersonation delegation chain.') def _create_default_gcs_bucket(self): try: diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index c45361324eba..651718037643 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -397,8 +397,7 @@ def _check_for_unsupported_features_on_non_portable_worker(self, pipeline): def run_pipeline(self, pipeline, options, pipeline_proto=None): """Remotely executes entire pipeline or parts reachable from node.""" google_cloud_options = options.view_as(GoogleCloudOptions) - set_impersonation_accounts( - google_cloud_options.impersonate_service_account) + set_impersonation_accounts(google_cloud_options.impersonate_service_account) # Label goog-dataflow-notebook if job is started from notebook. if is_in_notebook(): From 433585a843de5b401909c1517fe9ad0c3d19cd20 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 10 May 2022 10:07:07 -0400 Subject: [PATCH 22/46] simplified code in readthrough --- sdks/python/apache_beam/internal/gcp/auth.py | 7 ++----- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 11 ++++------- sdks/python/test-suites/gradle.properties | 2 +- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 141bdeff65ba..ba8b80540697 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -22,7 +22,6 @@ import logging import socket import threading -import traceback # google.auth is only available when Beam is installed with the gcp extra. try: @@ -179,12 +178,10 @@ def set_impersonation_accounts(cls, impersonate_service_account): @classmethod def _add_impersonation_credentials(cls, credentials): if not cls._impersonation_parameter_set: - traceback_str = traceback.format_stack() - raise Exception( - 'Impersonation credentials not yet set. \n' + str(traceback_str)) + raise AssertionError('Impersonation credentials not yet set.') """Adds impersonation credentials if the client species them.""" if cls._impersonate_service_account: - _LOGGER.info('Impersonating ' + str(cls._impersonate_service_account)) + _LOGGER.info('Impersonating: ' + str(cls._impersonate_service_account)) impersonate_accounts = cls._impersonate_service_account.split(',') target_principal = impersonate_accounts[-1] delegate_to = impersonate_accounts[0:-1] diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 3f356c1f40ba..096b93c44c71 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -1362,11 +1362,9 @@ def __init__( self.executing_project = None if hasattr(source, 'pipeline_options'): gcs_options = source.pipeline_options.view_as(GoogleCloudOptions) - target_principal = gcs_options.target_principal - delegate_accounts = gcs_options.delegate_accounts - auth.set_impersonation_accounts(target_principal, delegate_accounts) + auth.set_impersonation_accounts(gcs_options.impersonate_service_account) else: - auth.set_impersonation_accounts(None, None) + auth.set_impersonation_accounts(None) # TODO(silviuc): Try to automatically get it from gcloud config info. if not self.executing_project and test_bigquery_client is None: @@ -1485,9 +1483,8 @@ def __init__(self, sink, test_bigquery_client=None, buffer_size=None): self.project_id = ( sink.pipeline_options.view_as(GoogleCloudOptions).project) if hasattr(sink, 'pipeline_options'): - gcs_options = sink.pipeline_options.view_as(GoogleCloudOptions) - impersonate_service_account = gcs_options.impersonate_service_account - auth.set_impersonation_accounts(impersonate_service_account) + gcs_options = source.pipeline_options.view_as(GoogleCloudOptions) + auth.set_impersonation_accounts(gcs_options.impersonate_service_account) else: auth.set_impersonation_accounts(None) diff --git a/sdks/python/test-suites/gradle.properties b/sdks/python/test-suites/gradle.properties index 4cf9cc481600..72ee913f5d5c 100644 --- a/sdks/python/test-suites/gradle.properties +++ b/sdks/python/test-suites/gradle.properties @@ -31,7 +31,7 @@ dataflow_validates_container_tests=3.7,3.8,3.9 dataflow_validates_runner_batch_tests_V2=3.9 dataflow_validates_runner_streaming_tests_V2=3.9 dataflow_examples_postcommit_py_versions=3.9 -dataflow_impersonation_versions=3.8 +dataflow_impersonation_versions=3.9 # direct runner test-suites direct_mongodbio_it_task_py_versions=3.9 From 37c25a1599b7077addcd25b159fb698083d58eee Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 10 May 2022 11:19:11 -0400 Subject: [PATCH 23/46] added wordcount integration test for impersonation --- .../apache_beam/examples/wordcount_it_test.py | 20 ++++++++++++++ sdks/python/scripts/run_integration_test.sh | 22 ++------------- .../python/test-suites/dataflow/common.gradle | 27 ------------------- sdks/python/test-suites/gradle.properties | 1 - 4 files changed, 22 insertions(+), 48 deletions(-) diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index be8bbbfed8a6..99eace0d9736 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -47,6 +47,26 @@ class WordCountIT(unittest.TestCase): def test_wordcount_it(self): self._run_wordcount_it(wordcount.run) + @pytest.mark.it_postcommit + @pytest.mark.sickbay_direct + @pytest.mark.sickbay_spark + @pytest.mark.sickbay_flink + def test_wordcount_impersonation_it(self): + """Tests impersonation on dataflow.""" + ACOUNT_TO_IMPERSONATE = 'allows-impersonation@apache-beam-testing.iam.gserviceaccount.com' + RUNNER_ACCOUNT = 'impersonation-dataflow-worker@apache-beam-testing.iam.gserviceaccount.com' + OUTPUT_DIR = 'gs://impersonation-test-bucket/py-it-cloud/output' + TEMP_DIR = 'gs://impersonation-test-bucket/temp-it' + STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it' + extra_options = { + 'impersonate_service_account': ACOUNT_TO_IMPERSONATE, + 'service_account_email': RUNNER_ACCOUNT, + 'output': OUTPUT_DIR, + 'temp_location': TEMP_DIR, + 'staging_location': STAGING_LOCATION + } + self._run_wordcount_it(wordcount.run, **extra_options) + @pytest.mark.it_postcommit @pytest.mark.it_validatescontainer def test_wordcount_fnapi_it(self): diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh index c61691c09dac..b477e918d96c 100755 --- a/sdks/python/scripts/run_integration_test.sh +++ b/sdks/python/scripts/run_integration_test.sh @@ -43,7 +43,6 @@ # flag is specified, all above flag will be ignored. # Please include all required pipeline options when # using this flag. -# impersonate -> True if the test uses impersonation. # # Test related flags: # test_opts -> List of space separated options to configure Pytest test @@ -80,9 +79,6 @@ WORKER_JAR="" KMS_KEY_NAME="projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test" SUITE="" COLLECT_MARKERS= -IMPERSONATE_ACCOUNT=allows-impersonation@apache-beam-testing.iam.gserviceaccount.com -IMPERSONATE_RUNNER_ACCOUNT=impersonation-dataflow-worker@apache-beam-testing.iam.gserviceaccount.com -IMPERSONATE_LOCATION=gs://impersonation-test-bucket # Default test (pytest) options. # Run WordCountIT.test_wordcount_it by default if no test options are @@ -158,11 +154,6 @@ case $key in shift # past argument shift # past value ;; - --impersonate) - IMPERSONATE="$2" - shift # past argument - shift # past value - ;; --test_opts) TEST_OPTS="$2" shift # past argument @@ -233,6 +224,8 @@ if [[ -z $PIPELINE_OPTS ]]; then "--project=$PROJECT" "--region=$REGION" "--staging_location=$GCS_LOCATION/staging-it" + "--temp_location=$GCS_LOCATION/temp-it" + "--output=$GCS_LOCATION/py-it-cloud/output" "--sdk_location=$SDK_LOCATION" "--requirements_file=postcommit_requirements.txt" "--num_workers=$NUM_WORKERS" @@ -263,17 +256,6 @@ if [[ -z $PIPELINE_OPTS ]]; then fi - # add impersonate_service_account if provided - if [[ "$IMPERSONATE" = true ]]; then - opts+=("--impersonate_service_account=\"$IMPERSONATE_ACCOUNT\"") - opts+=("--service_account_email=\"$IMPERSONATE_RUNNER_ACCOUNT\"") - opts+=("--temp_location=$IMPERSONATE_LOCATION/temp-it") - opts+=("--output=$IMPERSONATE_LOCATION/py-it-cloud/output") - else - opts+=("--temp_location=$GCS_LOCATION/temp-it") - opts+=("--output=$GCS_LOCATION/py-it-cloud/output") - fi - if [[ ! -z "$KMS_KEY_NAME" ]]; then opts+=( "--kms_key_name=$KMS_KEY_NAME" diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 3f7d23a47f07..fdc428001c3b 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -364,30 +364,3 @@ task validatesContainer() { } } } - -task runImpersonationTest() { - dependsOn 'installGcpTest' - dependsOn ':sdks:python:sdist' - def testOpts = [ - "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it", - "--capture=no", // Print stdout instantly - "--numprocesses=2", // Number of tests running in parallel - "--timeout=1800", // Timeout of whole command execution - ] - - doFirst { - def argMap = [ - "test_opts": testOpts, - "sdk_location": files(configurations.distTarBall.files).singleFile, - "runner_v2": "true", - "suite": "postCommitIT-df${pythonVersionSuffix}-xdist", - "collect": "impersonation_wordcount_it_test", - "impersonate": "true" - ] - def cmdArgs = mapToArgString(argMap) - exec { - executable 'sh' - args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs" - } - } -} diff --git a/sdks/python/test-suites/gradle.properties b/sdks/python/test-suites/gradle.properties index 72ee913f5d5c..cf167b22abed 100644 --- a/sdks/python/test-suites/gradle.properties +++ b/sdks/python/test-suites/gradle.properties @@ -31,7 +31,6 @@ dataflow_validates_container_tests=3.7,3.8,3.9 dataflow_validates_runner_batch_tests_V2=3.9 dataflow_validates_runner_streaming_tests_V2=3.9 dataflow_examples_postcommit_py_versions=3.9 -dataflow_impersonation_versions=3.9 # direct runner test-suites direct_mongodbio_it_task_py_versions=3.9 From 4ba5f9bd5b6244857e3cdbdbfea4d5b5f7d31995 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 10 May 2022 20:25:21 -0400 Subject: [PATCH 24/46] changed output dir to use test dir --- sdks/python/apache_beam/examples/wordcount_it_test.py | 2 -- sdks/python/test-suites/dataflow/build.gradle | 6 ------ 2 files changed, 8 deletions(-) diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index 99eace0d9736..948aee1e4b7c 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -55,13 +55,11 @@ def test_wordcount_impersonation_it(self): """Tests impersonation on dataflow.""" ACOUNT_TO_IMPERSONATE = 'allows-impersonation@apache-beam-testing.iam.gserviceaccount.com' RUNNER_ACCOUNT = 'impersonation-dataflow-worker@apache-beam-testing.iam.gserviceaccount.com' - OUTPUT_DIR = 'gs://impersonation-test-bucket/py-it-cloud/output' TEMP_DIR = 'gs://impersonation-test-bucket/temp-it' STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it' extra_options = { 'impersonate_service_account': ACOUNT_TO_IMPERSONATE, 'service_account_email': RUNNER_ACCOUNT, - 'output': OUTPUT_DIR, 'temp_location': TEMP_DIR, 'staging_location': STAGING_LOCATION } diff --git a/sdks/python/test-suites/dataflow/build.gradle b/sdks/python/test-suites/dataflow/build.gradle index c411fc9720e3..036159eede3f 100644 --- a/sdks/python/test-suites/dataflow/build.gradle +++ b/sdks/python/test-suites/dataflow/build.gradle @@ -49,12 +49,6 @@ task chicagoTaxiExample { } } -task impersonation { - getVersionsAsList('dataflow_impersonation_versions').each { - dependsOn.add("sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:impersonation") - } -} - task validatesRunnerBatchTests { getVersionsAsList('dataflow_validates_runner_batch_tests').each { dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:validatesRunnerBatchTests") From 8865c1bf525f0e8f9dde268a317799ded0713d8d Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 10 May 2022 20:34:45 -0400 Subject: [PATCH 25/46] removed unused assert_bucket_exists call --- sdks/python/apache_beam/runners/interactive/utils.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index b22a25b19d5d..cfc2a1a8637d 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -439,7 +439,7 @@ def create_var_in_main(name: str, return name, value -def assert_bucket_exists(bucket_name, pipeline_options=None): +def assert_bucket_exists(bucket_name): # type: (str) -> None """Asserts whether the specified GCS bucket with the name @@ -451,12 +451,6 @@ def assert_bucket_exists(bucket_name, pipeline_options=None): """ try: from apitools.base.py.exceptions import HttpError - if pipeline_options: - gcs_options = pipeline_options.view_as(GoogleCloudOptions) - impersonate_service_account = gcs_options.impersonate_service_account - auth.set_impersonation_accounts(impersonate_service_account) - else: - auth.set_impersonation_accounts(None) storage_client = storage.StorageV1( credentials=auth.get_service_credentials(), get_credentials=False, From 17babbd850a1bf67e697299150ec655b0cb0cda2 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 10 May 2022 22:25:13 -0400 Subject: [PATCH 26/46] linted line length --- sdks/python/apache_beam/examples/wordcount_it_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index 948aee1e4b7c..58236fc56e7a 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -53,8 +53,10 @@ def test_wordcount_it(self): @pytest.mark.sickbay_flink def test_wordcount_impersonation_it(self): """Tests impersonation on dataflow.""" - ACOUNT_TO_IMPERSONATE = 'allows-impersonation@apache-beam-testing.iam.gserviceaccount.com' - RUNNER_ACCOUNT = 'impersonation-dataflow-worker@apache-beam-testing.iam.gserviceaccount.com' + ACOUNT_TO_IMPERSONATE = \ + 'allows-impersonation@apache-beam-testing.iam.gserviceaccount.com' + RUNNER_ACCOUNT = 'impersonation-dataflow-worker@' \ + 'apache-beam-testing.iam.gserviceaccount.com' TEMP_DIR = 'gs://impersonation-test-bucket/temp-it' STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it' extra_options = { From caa950af9f5b55cba6abd22b1cf496f0c3c0dbeb Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 10 May 2022 22:59:42 -0400 Subject: [PATCH 27/46] lint line length --- sdks/python/apache_beam/internal/gcp/auth.py | 2 +- sdks/python/apache_beam/options/pipeline_options.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index ba8b80540697..388f60037aae 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -181,7 +181,7 @@ def _add_impersonation_credentials(cls, credentials): raise AssertionError('Impersonation credentials not yet set.') """Adds impersonation credentials if the client species them.""" if cls._impersonate_service_account: - _LOGGER.info('Impersonating: ' + str(cls._impersonate_service_account)) + _LOGGER.info('Impersonating: %s', cls._impersonate_service_account) impersonate_accounts = cls._impersonate_service_account.split(',') target_principal = impersonate_accounts[-1] delegate_to = impersonate_accounts[0:-1] diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index f89d11619103..5aa29c0fd96e 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -748,8 +748,8 @@ def _add_argparse_args(cls, parser): '--enable_artifact_caching', default=False, action='store_true', - help='When true, artifacts will be cached across job submissions in the ' - 'GCS staging bucket') + help='When true, artifacts will be cached across job submissions in ' + 'the GCS staging bucket') parser.add_argument( '--impersonate_service_account', default=None, From b5acf5b6c3c4c4c924347b1037fa05e528e66a61 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 10:52:05 -0400 Subject: [PATCH 28/46] modified to be a parameter --- sdks/python/apache_beam/internal/gcp/auth.py | 37 ++++++++---------- .../apache_beam/io/gcp/bigquery_tools.py | 11 +++--- .../apache_beam/io/gcp/gcsfilesystem.py | 38 ++++++++----------- sdks/python/apache_beam/io/gcp/gcsio.py | 8 ++-- .../runners/dataflow/internal/apiclient.py | 5 +-- .../apache_beam/runners/interactive/utils.py | 2 +- .../portability/sdk_container_builder.py | 5 +-- 7 files changed, 43 insertions(+), 63 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 388f60037aae..e22ce380249a 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -70,11 +70,7 @@ def set_running_in_gce(worker_executing_project): executing_project = worker_executing_project -def set_impersonation_accounts(impersonate_service_account=None): - _Credentials.set_impersonation_accounts(impersonate_service_account) - - -def get_service_credentials(): +def get_service_credentials(pipeline_options): """For internal use only; no backwards-compatibility guarantees. Get credentials to access Google services. @@ -83,7 +79,7 @@ def get_service_credentials(): A ``google.auth.credentials.Credentials`` object or None if credentials not found. Returned object is thread-safe. """ - return _Credentials.get_service_credentials() + return _Credentials.get_service_credentials(pipeline_options) if _GOOGLE_AUTH_AVAILABLE: @@ -124,11 +120,9 @@ class _Credentials(object): _delegate_accounts = None _target_principal = None - _impersonation_parameter_set = False - _impersonate_service_account = None @classmethod - def get_service_credentials(cls): + def get_service_credentials(cls, pipeline_options): with cls._credentials_lock: if cls._credentials_init: return cls._credentials @@ -141,13 +135,13 @@ def get_service_credentials(cls): _LOGGER.info( "socket default timeout is %s seconds.", socket.getdefaulttimeout()) - cls._credentials = cls._get_service_credentials() + cls._credentials = cls._get_service_credentials(pipeline_options) cls._credentials_init = True return cls._credentials @staticmethod - def _get_service_credentials(): + def _get_service_credentials(pipeline_options): if not _GOOGLE_AUTH_AVAILABLE: _LOGGER.warning( 'Unable to find default credentials because the google-auth library ' @@ -157,7 +151,7 @@ def _get_service_credentials(): try: credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member - credentials = _Credentials._add_impersonation_credentials(credentials) + credentials = _Credentials._add_impersonation_credentials(credentials, pipeline_options) credentials = _ApitoolsCredentialsAdapter(credentials) logging.debug( 'Connecting using Google Application Default ' @@ -171,16 +165,15 @@ def _get_service_credentials(): return None @classmethod - def set_impersonation_accounts(cls, impersonate_service_account): - cls._impersonate_service_account = impersonate_service_account - cls._impersonation_parameter_set = True - - @classmethod - def _add_impersonation_credentials(cls, credentials): - if not cls._impersonation_parameter_set: - raise AssertionError('Impersonation credentials not yet set.') - """Adds impersonation credentials if the client species them.""" - if cls._impersonate_service_account: + def _add_impersonation_credentials(cls, credentials, pipeline_options): + impersonate_service_account = None + if isinstance(pipeline_options, PipelineOptions): + gcs_options = pipeline_options.view_as(GoogleCloudOptions) + impersonate_service_account = gcs_options.impersonate_service_account + else: + impersonate_service_account = pipeline_options.get( + 'impersonate_service_account') + if impersonate_service_account: _LOGGER.info('Impersonating: %s', cls._impersonate_service_account) impersonate_accounts = cls._impersonate_service_account.split(',') target_principal = impersonate_accounts[-1] diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 096b93c44c71..b0fda7d0032f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -314,7 +314,7 @@ class BigQueryWrapper(object): The wrapper is used to organize all the BigQuery integration points and offer a common place where retry logic for failures can be controlled. - In addition it offers various functions used both in sources and sinks + In addition, it offers various functions used both in sources and sinks (e.g., find and create tables, query a table, etc.). """ @@ -328,7 +328,7 @@ class BigQueryWrapper(object): def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): self.client = client or bigquery.BigqueryV2( http=get_new_http(), - credentials=auth.get_service_credentials(), + credentials=auth.get_service_credentials({}), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-%s" % apache_beam.__version__ @@ -1480,13 +1480,12 @@ def __init__(self, sink, test_bigquery_client=None, buffer_size=None): # If table schema did not define a project we default to executing project. if self.project_id is None and hasattr(sink, 'pipeline_options'): + self._pipeline_options = sink.pipeline_options self.project_id = ( sink.pipeline_options.view_as(GoogleCloudOptions).project) - if hasattr(sink, 'pipeline_options'): - gcs_options = source.pipeline_options.view_as(GoogleCloudOptions) - auth.set_impersonation_accounts(gcs_options.impersonate_service_account) else: - auth.set_impersonation_accounts(None) + # Credentials rely on pipeline options to determine impersonation. + self._pipeline_options = {} assert self.project_id is not None diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 1590dd34bb71..f74a7e3d9004 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,9 +34,6 @@ from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem from apache_beam.io.gcp import gcsio -from apache_beam.options.pipeline_options import GoogleCloudOptions -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.internal.gcp import auth __all__ = ['GCSFileSystem'] @@ -49,13 +46,7 @@ class GCSFileSystem(FileSystem): def __init__(self, pipeline_options): super().__init__(pipeline_options) - if isinstance(pipeline_options, PipelineOptions): - gcs_options = pipeline_options.view_as(GoogleCloudOptions) - impersonate_service_account = gcs_options.impersonate_service_account - else: - impersonate_service_account = pipeline_options.get( - 'impersonate_service_account') - auth.set_impersonation_accounts(impersonate_service_account) + self._pipeline_options = pipeline_options @classmethod def scheme(cls): @@ -139,12 +130,15 @@ def _list(self, dir_or_prefix): ``BeamIOError``: if listing fails, but not if no files were found. """ try: - for path, (size, updated) in gcsio.GcsIO().list_prefix( + for path, (size, updated) in self._gcsIO().list_prefix( dir_or_prefix, with_metadata=True).items(): yield FileMetadata(path, size, updated) except Exception as e: # pylint: disable=broad-except raise BeamIOError("List operation failed", {dir_or_prefix: e}) + def _gcsIO(self): + return gcsio.GcsIO(pipeline_options=self.pipeline_options) + def _path_open( self, path, @@ -155,7 +149,7 @@ def _path_open( """ compression_type = FileSystem._get_compression_type(path, compression_type) mime_type = CompressionTypes.mime_type(compression_type, mime_type) - raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type) + raw_file = self._gcsIO().open(path, mode, mime_type=mime_type) if compression_type == CompressionTypes.UNCOMPRESSED: return raw_file return CompressedFile(raw_file, compression_type=compression_type) @@ -218,9 +212,9 @@ def _copy_path(source, destination): raise ValueError('Destination %r must be GCS path.' % destination) # Use copy_tree if the path ends with / as it is a directory if source.endswith('/'): - gcsio.GcsIO().copytree(source, destination) + self._gcsIO().copytree(source, destination) else: - gcsio.GcsIO().copy(source, destination) + self._gcsIO().copy(source, destination) exceptions = {} for source, destination in zip(source_file_names, destination_file_names): @@ -261,7 +255,7 @@ def rename(self, source_file_names, destination_file_names): # Execute GCS renames if any and return exceptions. exceptions = {} for batch in gcs_batches: - copy_statuses = gcsio.GcsIO().copy_batch(batch) + copy_statuses = self._gcsIO().copy_batch(batch) copy_succeeded = [] for src, dest, exception in copy_statuses: if exception: @@ -269,7 +263,7 @@ def rename(self, source_file_names, destination_file_names): else: copy_succeeded.append((src, dest)) delete_batch = [src for src, dest in copy_succeeded] - delete_statuses = gcsio.GcsIO().delete_batch(delete_batch) + delete_statuses = self._gcsIO().delete_batch(delete_batch) for i, (src, exception) in enumerate(delete_statuses): dest = copy_succeeded[i][1] if exception: @@ -286,7 +280,7 @@ def exists(self, path): Returns: boolean flag indicating if path exists """ - return gcsio.GcsIO().exists(path) + return self._gcsIO().exists(path) def size(self, path): """Get size of path on the FileSystem. @@ -299,7 +293,7 @@ def size(self, path): Raises: ``BeamIOError``: if path doesn't exist. """ - return gcsio.GcsIO().size(path) + return self._gcsIO().size(path) def last_updated(self, path): """Get UNIX Epoch time in seconds on the FileSystem. @@ -312,7 +306,7 @@ def last_updated(self, path): Raises: ``BeamIOError``: if path doesn't exist. """ - return gcsio.GcsIO().last_updated(path) + return self._gcsIO().last_updated(path) def checksum(self, path): """Fetch checksum metadata of a file on the @@ -327,7 +321,7 @@ def checksum(self, path): ``BeamIOError``: if path isn't a file or doesn't exist. """ try: - return gcsio.GcsIO().checksum(path) + return self._gcsIO().checksum(path) except Exception as e: # pylint: disable=broad-except raise BeamIOError("Checksum operation failed", {path: e}) @@ -344,7 +338,7 @@ def metadata(self, path): ``BeamIOError``: if path isn't a file or doesn't exist. """ try: - file_metadata = gcsio.GcsIO()._status(path) + file_metadata = self._gcsIO()._status(path) return FileMetadata( path, file_metadata['size'], file_metadata['last_updated']) except Exception as e: # pylint: disable=broad-except @@ -365,7 +359,7 @@ def _delete_path(path): else: path_to_use = path match_result = self.match([path_to_use])[0] - statuses = gcsio.GcsIO().delete_batch( + statuses = self._gcsIO().delete_batch( [m.path for m in match_result.metadata_list]) # pylint: disable=used-before-assignment failures = [e for (_, e) in statuses if e is not None] diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 599861b5f778..7bf0b05c947e 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -138,7 +138,7 @@ def get_or_create_default_gcs_bucket(options): return None bucket_name = default_gcs_bucket_name(project, region) - bucket = GcsIO().get_bucket(bucket_name) + bucket = GcsIO(pipeline_options=options).get_bucket(bucket_name) if bucket: return bucket else: @@ -146,7 +146,7 @@ def get_or_create_default_gcs_bucket(options): 'Creating default GCS bucket for project %s: gs://%s', project, bucket_name) - return GcsIO().create_bucket(bucket_name, project, location=region) + return GcsIO(pipeline_options=options).create_bucket(bucket_name, project, location=region) class GcsIOError(IOError, retry.PermanentException): @@ -156,10 +156,10 @@ class GcsIOError(IOError, retry.PermanentException): class GcsIO(object): """Google Cloud Storage I/O client.""" - def __init__(self, storage_client=None): + def __init__(self, storage_client=None, pipeline_options={}): if storage_client is None: storage_client = storage.StorageV1( - credentials=auth.get_service_credentials(), + credentials=auth.get_service_credentials(pipeline_options), get_credentials=False, http=get_new_http(), response_encoding='utf8', diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index f5685d577993..dc2b04c1e4a8 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -50,7 +50,6 @@ from apache_beam import version as beam_version from apache_beam.internal.gcp.auth import get_service_credentials -from apache_beam.internal.gcp.auth import set_impersonation_accounts from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.internal.http_client import get_new_http from apache_beam.io.filesystems import FileSystems @@ -562,9 +561,7 @@ def __init__(self, options, root_staging_location=None): if self.google_cloud_options.no_auth: credentials = None else: - set_impersonation_accounts( - self.google_cloud_options.impersonate_service_account) - credentials = get_service_credentials() + credentials = get_service_credentials(options) http_client = get_new_http() self._client = dataflow.DataflowV1b3( diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index cfc2a1a8637d..108a4e5e06cd 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -452,7 +452,7 @@ def assert_bucket_exists(bucket_name): try: from apitools.base.py.exceptions import HttpError storage_client = storage.StorageV1( - credentials=auth.get_service_credentials(), + credentials=auth.get_service_credentials({}), get_credentials=False, http=get_new_http(), response_encoding='utf8') diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py index a298ed0a5e75..d4b2777b7a4b 100644 --- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py +++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py @@ -40,7 +40,6 @@ from apache_beam import version as beam_version from apache_beam.internal.gcp.auth import get_service_credentials -from apache_beam.internal.gcp.auth import set_impersonation_accounts from apache_beam.internal.http_client import get_new_http from apache_beam.io.gcp.internal.clients import storage from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -211,9 +210,7 @@ def __init__(self, options): if self._google_cloud_options.no_auth: credentials = None else: - set_impersonation_accounts( - self._google_cloud_options.impersonate_service_account) - credentials = get_service_credentials() + credentials = get_service_credentials(options) self._storage_client = storage.StorageV1( url='https://www.googleapis.com/storage/v1', credentials=credentials, From f628a4e81ed957d1704716ab8bbefb8d1f8e92ce Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 11:01:41 -0400 Subject: [PATCH 29/46] large refactor to make pipeline_options a required parameter get_service_credentials --- .../apache_beam/examples/wordcount_it_test.py | 24 ++++++++++++++----- sdks/python/apache_beam/internal/gcp/auth.py | 13 ++++++---- sdks/python/apache_beam/io/gcp/gcsio.py | 3 ++- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index 58236fc56e7a..0af89283ea92 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -52,15 +52,27 @@ def test_wordcount_it(self): @pytest.mark.sickbay_spark @pytest.mark.sickbay_flink def test_wordcount_impersonation_it(self): - """Tests impersonation on dataflow.""" - ACOUNT_TO_IMPERSONATE = \ - 'allows-impersonation@apache-beam-testing.iam.gserviceaccount.com' - RUNNER_ACCOUNT = 'impersonation-dataflow-worker@' \ - 'apache-beam-testing.iam.gserviceaccount.com' + """Tests impersonation on dataflow. + + For testing impersonation, we use three ingredients: + - a principal to impersonate + - a dataflow service account that only that principal is + allowed to launch jobs as + - a temp root that only the above two accounts have access to + + Jenkins and Dataflow workers both run as GCE default service account. + So we remove that account from all the above. + """ + ACCOUNT_TO_IMPERSONATE = ( + 'allows-impersonation@apache-' + 'beam-testing.iam.gserviceaccount.com') + RUNNER_ACCOUNT = ( + 'impersonation-dataflow-worker@' + 'apache-beam-testing.iam.gserviceaccount.com') TEMP_DIR = 'gs://impersonation-test-bucket/temp-it' STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it' extra_options = { - 'impersonate_service_account': ACOUNT_TO_IMPERSONATE, + 'impersonate_service_account': ACCOUNT_TO_IMPERSONATE, 'service_account_email': RUNNER_ACCOUNT, 'temp_location': TEMP_DIR, 'staging_location': STAGING_LOCATION diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index e22ce380249a..573193044f76 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -22,6 +22,8 @@ import logging import socket import threading +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import PipelineOptions # google.auth is only available when Beam is installed with the gcp extra. try: @@ -151,7 +153,8 @@ def _get_service_credentials(pipeline_options): try: credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member - credentials = _Credentials._add_impersonation_credentials(credentials, pipeline_options) + credentials = _Credentials._add_impersonation_credentials( + credentials, pipeline_options) credentials = _ApitoolsCredentialsAdapter(credentials) logging.debug( 'Connecting using Google Application Default ' @@ -164,7 +167,7 @@ def _get_service_credentials(pipeline_options): e) return None - @classmethod + @staticmethod def _add_impersonation_credentials(cls, credentials, pipeline_options): impersonate_service_account = None if isinstance(pipeline_options, PipelineOptions): @@ -172,10 +175,10 @@ def _add_impersonation_credentials(cls, credentials, pipeline_options): impersonate_service_account = gcs_options.impersonate_service_account else: impersonate_service_account = pipeline_options.get( - 'impersonate_service_account') + 'impersonate_service_account') if impersonate_service_account: - _LOGGER.info('Impersonating: %s', cls._impersonate_service_account) - impersonate_accounts = cls._impersonate_service_account.split(',') + _LOGGER.info('Impersonating: %s', impersonate_service_account) + impersonate_accounts = impersonate_service_account.split(',') target_principal = impersonate_accounts[-1] delegate_to = impersonate_accounts[0:-1] credentials = impersonated_credentials.Credentials( diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 7bf0b05c947e..06462ec9bd13 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -146,7 +146,8 @@ def get_or_create_default_gcs_bucket(options): 'Creating default GCS bucket for project %s: gs://%s', project, bucket_name) - return GcsIO(pipeline_options=options).create_bucket(bucket_name, project, location=region) + return GcsIO(pipeline_options=options).create_bucket( + bucket_name, project, location=region) class GcsIOError(IOError, retry.PermanentException): From d986185f07ccb898feba3f0f37ce33093f30370f Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 11:35:15 -0400 Subject: [PATCH 30/46] finished changes --- sdks/python/apache_beam/internal/gcp/auth.py | 3 +-- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 2 +- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 5 +---- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 573193044f76..5a3f82f1f1b9 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -168,8 +168,7 @@ def _get_service_credentials(pipeline_options): return None @staticmethod - def _add_impersonation_credentials(cls, credentials, pipeline_options): - impersonate_service_account = None + def _add_impersonation_credentials(credentials, pipeline_options): if isinstance(pipeline_options, PipelineOptions): gcs_options = pipeline_options.view_as(GoogleCloudOptions) impersonate_service_account = gcs_options.impersonate_service_account diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index f74a7e3d9004..42c062654d58 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -137,7 +137,7 @@ def _list(self, dir_or_prefix): raise BeamIOError("List operation failed", {dir_or_prefix: e}) def _gcsIO(self): - return gcsio.GcsIO(pipeline_options=self.pipeline_options) + return gcsio.GcsIO(pipeline_options=self._pipeline_options) def _path_open( self, diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 651718037643..13cbec6dc022 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -41,7 +41,6 @@ from apache_beam import error from apache_beam.internal import pickler from apache_beam.internal.gcp import json_value -from apache_beam.internal.gcp.auth import set_impersonation_accounts from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import SetupOptions @@ -396,9 +395,6 @@ def _check_for_unsupported_features_on_non_portable_worker(self, pipeline): def run_pipeline(self, pipeline, options, pipeline_proto=None): """Remotely executes entire pipeline or parts reachable from node.""" - google_cloud_options = options.view_as(GoogleCloudOptions) - set_impersonation_accounts(google_cloud_options.impersonate_service_account) - # Label goog-dataflow-notebook if job is started from notebook. if is_in_notebook(): notebook_version = ( @@ -550,6 +546,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): # Elevate "enable_streaming_engine" to pipeline option, but using the # existing experiment. + google_cloud_options = options.view_as(GoogleCloudOptions) if google_cloud_options.enable_streaming_engine: debug_options.add_experiment("enable_windmill_service") debug_options.add_experiment("enable_streaming_engine") From 5a6cf5ace0ae3628cbd4ba43689b68468928c26f Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 12:38:02 -0400 Subject: [PATCH 31/46] updated pylint --- sdks/python/apache_beam/io/gcp/gcsio.py | 2 +- .../apache_beam/runners/portability/sdk_container_builder.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 06462ec9bd13..e04a066daa62 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -157,7 +157,7 @@ class GcsIOError(IOError, retry.PermanentException): class GcsIO(object): """Google Cloud Storage I/O client.""" - def __init__(self, storage_client=None, pipeline_options={}): + def __init__(self, storage_client=None, pipeline_options={}): # pylint: disable=dangerous-default-value if storage_client is None: storage_client = storage.StorageV1( credentials=auth.get_service_credentials(pipeline_options), diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py index d4b2777b7a4b..f81e015ea591 100644 --- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py +++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py @@ -206,7 +206,6 @@ def __init__(self, options): self._google_cloud_options = options.view_as(GoogleCloudOptions) self._cloud_build_machine_type = self._get_cloud_build_machine_type_enum( options.view_as(SetupOptions).cloud_build_machine_type) - if self._google_cloud_options.no_auth: credentials = None else: From a7ad6e5de3b168115894f07f8f8c0007e82cbc46 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 13:21:47 -0400 Subject: [PATCH 32/46] fixed import order --- sdks/python/apache_beam/internal/gcp/auth.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 5a3f82f1f1b9..82b18567e43f 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -22,6 +22,7 @@ import logging import socket import threading + from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions From 1c9eaf5171701acaa50f1375d27189a8c1255c76 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 13:27:45 -0400 Subject: [PATCH 33/46] small lint change added line --- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 42c062654d58..11184cd34fd3 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,6 +34,7 @@ from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem from apache_beam.io.gcp import gcsio + __all__ = ['GCSFileSystem'] From d9b51ae95717dd90192dfc7c390c7d46872d3202 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 13:56:03 -0400 Subject: [PATCH 34/46] yapf --- sdks/python/apache_beam/io/gcp/gcsio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index e04a066daa62..30a14adc9eb7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -157,7 +157,7 @@ class GcsIOError(IOError, retry.PermanentException): class GcsIO(object): """Google Cloud Storage I/O client.""" - def __init__(self, storage_client=None, pipeline_options={}): # pylint: disable=dangerous-default-value + def __init__(self, storage_client=None, pipeline_options={}): # pylint: disable=dangerous-default-value if storage_client is None: storage_client = storage.StorageV1( credentials=auth.get_service_credentials(pipeline_options), From 2253391903ab8cc67d76189f4e8ffa16107195e6 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 15:12:59 -0400 Subject: [PATCH 35/46] remove set_impersonation_accounts --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index b0fda7d0032f..adcd0cc2103e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -1360,11 +1360,6 @@ def __init__( source.pipeline_options.view_as(GoogleCloudOptions).project) else: self.executing_project = None - if hasattr(source, 'pipeline_options'): - gcs_options = source.pipeline_options.view_as(GoogleCloudOptions) - auth.set_impersonation_accounts(gcs_options.impersonate_service_account) - else: - auth.set_impersonation_accounts(None) # TODO(silviuc): Try to automatically get it from gcloud config info. if not self.executing_project and test_bigquery_client is None: From a01635b7447890962228d6a448efba10aeead483 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 15:20:51 -0400 Subject: [PATCH 36/46] changed {} to none to avoid dangerous default value --- sdks/python/apache_beam/internal/gcp/auth.py | 2 ++ sdks/python/apache_beam/io/gcp/gcsio.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 82b18567e43f..d71b4e6543f3 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -170,6 +170,8 @@ def _get_service_credentials(pipeline_options): @staticmethod def _add_impersonation_credentials(credentials, pipeline_options): + if not pipeline_options: + return credentials if isinstance(pipeline_options, PipelineOptions): gcs_options = pipeline_options.view_as(GoogleCloudOptions) impersonate_service_account = gcs_options.impersonate_service_account diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 30a14adc9eb7..bf41ae646107 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -157,7 +157,7 @@ class GcsIOError(IOError, retry.PermanentException): class GcsIO(object): """Google Cloud Storage I/O client.""" - def __init__(self, storage_client=None, pipeline_options={}): # pylint: disable=dangerous-default-value + def __init__(self, storage_client=None, pipeline_options=None): if storage_client is None: storage_client = storage.StorageV1( credentials=auth.get_service_credentials(pipeline_options), From bd2dd1145c3c6db5a2bfca2e81e6c7601d3de1ca Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 18:44:11 -0400 Subject: [PATCH 37/46] fix mock to have parameter --- sdks/python/apache_beam/io/gcp/gcsio_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index a4aa2d4aa858..260090461c8c 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -461,7 +461,7 @@ def test_delete(self): @mock.patch( 'apache_beam.io.gcp.gcsio.auth.get_service_credentials', - wraps=lambda: None) + wraps=lambda pipeline_options: None) @mock.patch('apache_beam.io.gcp.gcsio.get_new_http') def test_user_agent_passed(self, get_new_http_mock, get_service_creds_mock): client = gcsio.GcsIO() From 677cdde19c4ef833e5e2bdb490dddf2f469979d4 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 18:58:42 -0400 Subject: [PATCH 38/46] fixes filesystem test --- .../apache_beam/io/gcp/gcsfilesystem_test.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index b4d921ada234..49b0bdc9f6cf 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -81,7 +81,7 @@ def test_split(self): def test_match_multiples(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock gcsio_mock.list_prefix.return_value = { 'gs://bucket/file1': (1, 99999.0), 'gs://bucket/file2': (2, 88888.0) } @@ -99,7 +99,7 @@ def test_match_multiples_limit(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() limit = 1 - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': (1, 99999.0)} expected_results = set([FileMetadata('gs://bucket/file1', 1, 99999.0)]) match_result = self.fs.match(['gs://bucket/'], [limit])[0] @@ -112,7 +112,7 @@ def test_match_multiples_limit(self, mock_gcsio): def test_match_multiples_error(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock exception = IOError('Failed') gcsio_mock.list_prefix.side_effect = exception @@ -128,7 +128,7 @@ def test_match_multiples_error(self, mock_gcsio): def test_match_multiple_patterns(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock gcsio_mock.list_prefix.side_effect = [ { 'gs://bucket/file1': (1, 99999.0) @@ -146,7 +146,7 @@ def test_match_multiple_patterns(self, mock_gcsio): def test_create(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock # Issue file copy _ = self.fs.create('gs://bucket/from1', 'application/octet-stream') @@ -157,7 +157,7 @@ def test_create(self, mock_gcsio): def test_open(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock # Issue file copy _ = self.fs.open('gs://bucket/from1', 'application/octet-stream') @@ -168,7 +168,7 @@ def test_open(self, mock_gcsio): def test_copy_file(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = ['gs://bucket/from1'] destinations = ['gs://bucket/to1'] @@ -182,7 +182,7 @@ def test_copy_file(self, mock_gcsio): def test_copy_file_error(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = ['gs://bucket/from1'] destinations = ['gs://bucket/to1'] @@ -208,7 +208,7 @@ def test_copy_file_error(self, mock_gcsio): def test_copy_tree(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = ['gs://bucket1/'] destinations = ['gs://bucket2/'] @@ -222,7 +222,7 @@ def test_copy_tree(self, mock_gcsio): def test_rename(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -262,7 +262,7 @@ def test_rename(self, mock_gcsio): def test_rename_error(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -308,7 +308,7 @@ def test_rename_error(self, mock_gcsio): def test_delete(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0} files = [ 'gs://bucket/from1', @@ -324,7 +324,7 @@ def test_delete(self, mock_gcsio): def test_delete_error(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock exception = IOError('Failed') gcsio_mock.delete_batch.side_effect = exception gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0} From e6cd15a660ff3a635861e854ea221a42fe2f9302 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 11 May 2022 23:28:28 -0400 Subject: [PATCH 39/46] temp added more logging details --- sdks/python/scripts/run_integration_test.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh index b477e918d96c..7b42e6761291 100755 --- a/sdks/python/scripts/run_integration_test.sh +++ b/sdks/python/scripts/run_integration_test.sh @@ -277,8 +277,8 @@ fi echo ">>> RUNNING integration tests with pipeline options: $PIPELINE_OPTS" echo ">>> pytest options: $TEST_OPTS" echo ">>> collect markers: $COLLECT_MARKERS" -ARGS="-o junit_suite_name=$SUITE --junitxml=pytest_$SUITE.xml $TEST_OPTS" -# Handle markers as an independient argument from $TEST_OPTS to prevent errors in space separeted flags +ARGS="-o junit_suite_name=$SUITE -o log_cli=true -o log_level=INFO --junitxml=pytest_$SUITE.xml $TEST_OPTS" +# Handle markers as an independent argument from $TEST_OPTS to prevent errors in space separated flags if [ -z "$COLLECT_MARKERS" ]; then pytest $ARGS --test-pipeline-options="$PIPELINE_OPTS" else From 727ccc6fb615a12bb4b994e8e7ad2c41aa72b8a3 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 12 May 2022 06:48:10 -0400 Subject: [PATCH 40/46] fixed bug where None was being turning into a string --- sdks/python/apache_beam/internal/gcp/auth.py | 9 +++------ .../apache_beam/runners/dataflow/internal/apiclient.py | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index d71b4e6543f3..e3b60dd70685 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -121,9 +121,6 @@ class _Credentials(object): _credentials_init = False _credentials = None - _delegate_accounts = None - _target_principal = None - @classmethod def get_service_credentials(cls, pipeline_options): with cls._credentials_lock: @@ -170,14 +167,14 @@ def _get_service_credentials(pipeline_options): @staticmethod def _add_impersonation_credentials(credentials, pipeline_options): - if not pipeline_options: - return credentials if isinstance(pipeline_options, PipelineOptions): gcs_options = pipeline_options.view_as(GoogleCloudOptions) impersonate_service_account = gcs_options.impersonate_service_account - else: + elif isinstance(pipeline_options, dict): impersonate_service_account = pipeline_options.get( 'impersonate_service_account') + else: + return credentials if impersonate_service_account: _LOGGER.info('Impersonating: %s', impersonate_service_account) impersonate_accounts = impersonate_service_account.split(',') diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index dc2b04c1e4a8..1a90e4655c0d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -348,7 +348,7 @@ def __init__( # Don't pass impersonate_service_account through to the harness. # Though impersonation should start a job, the workers should # not try to modify their credentials. - options_dict['impersonate_service_account'] = None + options_dict.popt('impersonate_service_account', None) self.proto.sdkPipelineOptions.additionalProperties.append( dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( key='options', value=to_json_value(options_dict))) From 156df850b4f54fc5b0182597b4c36f0e4918f1c1 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 12 May 2022 06:48:55 -0400 Subject: [PATCH 41/46] typo fi --- sdks/python/apache_beam/runners/dataflow/internal/apiclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 1a90e4655c0d..e08729565294 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -348,7 +348,7 @@ def __init__( # Don't pass impersonate_service_account through to the harness. # Though impersonation should start a job, the workers should # not try to modify their credentials. - options_dict.popt('impersonate_service_account', None) + options_dict.pop('impersonate_service_account', None) self.proto.sdkPipelineOptions.additionalProperties.append( dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( key='options', value=to_json_value(options_dict))) From f6a06c5cd5517a19c56d2142f25a3b709cc50aaa Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 12 May 2022 09:18:26 -0400 Subject: [PATCH 42/46] reverted run_integration_test.sh will make that change in a different cl --- sdks/python/scripts/run_integration_test.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh index 7b42e6761291..b477e918d96c 100755 --- a/sdks/python/scripts/run_integration_test.sh +++ b/sdks/python/scripts/run_integration_test.sh @@ -277,8 +277,8 @@ fi echo ">>> RUNNING integration tests with pipeline options: $PIPELINE_OPTS" echo ">>> pytest options: $TEST_OPTS" echo ">>> collect markers: $COLLECT_MARKERS" -ARGS="-o junit_suite_name=$SUITE -o log_cli=true -o log_level=INFO --junitxml=pytest_$SUITE.xml $TEST_OPTS" -# Handle markers as an independent argument from $TEST_OPTS to prevent errors in space separated flags +ARGS="-o junit_suite_name=$SUITE --junitxml=pytest_$SUITE.xml $TEST_OPTS" +# Handle markers as an independient argument from $TEST_OPTS to prevent errors in space separeted flags if [ -z "$COLLECT_MARKERS" ]; then pytest $ARGS --test-pipeline-options="$PIPELINE_OPTS" else From 7488468e62216a55d13c2a41e90452a61035277c Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 12 May 2022 09:42:15 -0400 Subject: [PATCH 43/46] valentyns comments --- sdks/python/apache_beam/internal/gcp/auth.py | 3 +++ sdks/python/apache_beam/io/gcp/bigquery_tools.py | 6 +----- sdks/python/apache_beam/runners/interactive/utils.py | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index e3b60dd70685..27a3c40cd4b3 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -77,6 +77,9 @@ def get_service_credentials(pipeline_options): """For internal use only; no backwards-compatibility guarantees. Get credentials to access Google services. + Args: + pipeline_options: Pipeline options, used in creating credentials + like impersonated credentials. Returns: A ``google.auth.credentials.Credentials`` object or None if credentials diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index adcd0cc2103e..bb3b60273404 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -328,7 +328,7 @@ class BigQueryWrapper(object): def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): self.client = client or bigquery.BigqueryV2( http=get_new_http(), - credentials=auth.get_service_credentials({}), + credentials=auth.get_service_credentials(None), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-%s" % apache_beam.__version__ @@ -1475,12 +1475,8 @@ def __init__(self, sink, test_bigquery_client=None, buffer_size=None): # If table schema did not define a project we default to executing project. if self.project_id is None and hasattr(sink, 'pipeline_options'): - self._pipeline_options = sink.pipeline_options self.project_id = ( sink.pipeline_options.view_as(GoogleCloudOptions).project) - else: - # Credentials rely on pipeline options to determine impersonation. - self._pipeline_options = {} assert self.project_id is not None diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index 108a4e5e06cd..68b4fceaa8c3 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -452,7 +452,7 @@ def assert_bucket_exists(bucket_name): try: from apitools.base.py.exceptions import HttpError storage_client = storage.StorageV1( - credentials=auth.get_service_credentials({}), + credentials=auth.get_service_credentials(None), get_credentials=False, http=get_new_http(), response_encoding='utf8') From 11857a2b0a73e41ba77cba820d1ce30879edac34 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 12 May 2022 09:53:45 -0400 Subject: [PATCH 44/46] removed dict option for impersonated credentials --- sdks/python/apache_beam/internal/gcp/auth.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 27a3c40cd4b3..41ba5f1c6048 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -173,9 +173,6 @@ def _add_impersonation_credentials(credentials, pipeline_options): if isinstance(pipeline_options, PipelineOptions): gcs_options = pipeline_options.view_as(GoogleCloudOptions) impersonate_service_account = gcs_options.impersonate_service_account - elif isinstance(pipeline_options, dict): - impersonate_service_account = pipeline_options.get( - 'impersonate_service_account') else: return credentials if impersonate_service_account: From 1ec2b1d6887920727889f2b35de9b32e3fd3f2f7 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 12 May 2022 10:18:35 -0400 Subject: [PATCH 45/46] added dictionary back --- sdks/python/apache_beam/internal/gcp/auth.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 41ba5f1c6048..27a3c40cd4b3 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -173,6 +173,9 @@ def _add_impersonation_credentials(credentials, pipeline_options): if isinstance(pipeline_options, PipelineOptions): gcs_options = pipeline_options.view_as(GoogleCloudOptions) impersonate_service_account = gcs_options.impersonate_service_account + elif isinstance(pipeline_options, dict): + impersonate_service_account = pipeline_options.get( + 'impersonate_service_account') else: return credentials if impersonate_service_account: From caa3662a71177fb0e798ec912b46bdd2fc4205a1 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 12 May 2022 12:49:12 -0700 Subject: [PATCH 46/46] DO NOT MERGE: hacking postcommit gradle task --- sdks/python/apache_beam/examples/wordcount_it_test.py | 2 +- sdks/python/test-suites/dataflow/common.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index 0af89283ea92..2030986d60b5 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -47,7 +47,7 @@ class WordCountIT(unittest.TestCase): def test_wordcount_it(self): self._run_wordcount_it(wordcount.run) - @pytest.mark.it_postcommit + @pytest.mark.it_postcommit_impersonation @pytest.mark.sickbay_direct @pytest.mark.sickbay_spark @pytest.mark.sickbay_flink diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index fdc428001c3b..bfc672b00abf 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -115,7 +115,7 @@ task postCommitIT { "sdk_location": files(configurations.distTarBall.files).singleFile, "worker_jar": dataflowWorkerJar, "suite": "postCommitIT-df${pythonVersionSuffix}", - "collect": "it_postcommit" + "collect": "it_postcommit_impersonation" ] def cmdArgs = mapToArgString(argMap) exec {