diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index be8bbbfed8a6..afbe70b9d239 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -28,6 +28,7 @@ from hamcrest.core.core.allof import all_of from apache_beam.examples import wordcount +from apache_beam.internal.gcp import auth from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher @@ -47,6 +48,44 @@ class WordCountIT(unittest.TestCase): def test_wordcount_it(self): self._run_wordcount_it(wordcount.run) + @pytest.mark.it_postcommit + @pytest.mark.sickbay_direct + @pytest.mark.sickbay_spark + @pytest.mark.sickbay_flink + def test_wordcount_impersonation_it(self): + """Tests impersonation on dataflow. + + For testing impersonation, we use three ingredients: + - a principal to impersonate + - a dataflow service account that only that principal is + allowed to launch jobs as + - a temp root that only the above two accounts have access to + + Jenkins and Dataflow workers both run as GCE default service account. + So we remove that account from all the above. + """ + # Credentials need to be reset or this test will fail and credentials + # from a previous test will be used. + auth._Credentials._credentials_init = False + + ACCOUNT_TO_IMPERSONATE = ( + 'allows-impersonation@apache-' + 'beam-testing.iam.gserviceaccount.com') + RUNNER_ACCOUNT = ( + 'impersonation-dataflow-worker@' + 'apache-beam-testing.iam.gserviceaccount.com') + TEMP_DIR = 'gs://impersonation-test-bucket/temp-it' + STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it' + extra_options = { + 'impersonate_service_account': ACCOUNT_TO_IMPERSONATE, + 'service_account_email': RUNNER_ACCOUNT, + 'temp_location': TEMP_DIR, + 'staging_location': STAGING_LOCATION + } + self._run_wordcount_it(wordcount.run, **extra_options) + # Reset credentials for future tests. + auth._Credentials._credentials_init = False + @pytest.mark.it_postcommit @pytest.mark.it_validatescontainer def test_wordcount_fnapi_it(self): diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 439264a9794b..27a3c40cd4b3 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -23,8 +23,12 @@ import socket import threading +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import PipelineOptions + # google.auth is only available when Beam is installed with the gcp extra. try: + from google.auth import impersonated_credentials import google.auth import google_auth_httplib2 _GOOGLE_AUTH_AVAILABLE = True @@ -40,6 +44,16 @@ _LOGGER = logging.getLogger(__name__) +CLIENT_SCOPES = [ + 'https://www.googleapis.com/auth/bigquery', + 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/devstorage.full_control', + 'https://www.googleapis.com/auth/userinfo.email', + 'https://www.googleapis.com/auth/datastore', + 'https://www.googleapis.com/auth/spanner.admin', + 'https://www.googleapis.com/auth/spanner.data' +] + def set_running_in_gce(worker_executing_project): """For internal use only; no backwards-compatibility guarantees. @@ -59,16 +73,19 @@ def set_running_in_gce(worker_executing_project): executing_project = worker_executing_project -def get_service_credentials(): +def get_service_credentials(pipeline_options): """For internal use only; no backwards-compatibility guarantees. Get credentials to access Google services. + Args: + pipeline_options: Pipeline options, used in creating credentials + like impersonated credentials. Returns: A ``google.auth.credentials.Credentials`` object or None if credentials not found. Returned object is thread-safe. """ - return _Credentials.get_service_credentials() + return _Credentials.get_service_credentials(pipeline_options) if _GOOGLE_AUTH_AVAILABLE: @@ -108,10 +125,7 @@ class _Credentials(object): _credentials = None @classmethod - def get_service_credentials(cls): - if cls._credentials_init: - return cls._credentials - + def get_service_credentials(cls, pipeline_options): with cls._credentials_lock: if cls._credentials_init: return cls._credentials @@ -124,13 +138,13 @@ def get_service_credentials(cls): _LOGGER.info( "socket default timeout is %s seconds.", socket.getdefaulttimeout()) - cls._credentials = cls._get_service_credentials() + cls._credentials = cls._get_service_credentials(pipeline_options) cls._credentials_init = True return cls._credentials @staticmethod - def _get_service_credentials(): + def _get_service_credentials(pipeline_options): if not _GOOGLE_AUTH_AVAILABLE: _LOGGER.warning( 'Unable to find default credentials because the google-auth library ' @@ -138,17 +152,10 @@ def _get_service_credentials(): 'Google default credentials. Connecting anonymously.') return None - client_scopes = [ - 'https://www.googleapis.com/auth/bigquery', - 'https://www.googleapis.com/auth/cloud-platform', - 'https://www.googleapis.com/auth/devstorage.full_control', - 'https://www.googleapis.com/auth/userinfo.email', - 'https://www.googleapis.com/auth/datastore', - 'https://www.googleapis.com/auth/spanner.admin', - 'https://www.googleapis.com/auth/spanner.data' - ] try: - credentials, _ = google.auth.default(scopes=client_scopes) # pylint: disable=c-extension-no-member + credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member + credentials = _Credentials._add_impersonation_credentials( + credentials, pipeline_options) credentials = _ApitoolsCredentialsAdapter(credentials) logging.debug( 'Connecting using Google Application Default ' @@ -160,3 +167,26 @@ def _get_service_credentials(): 'Connecting anonymously.', e) return None + + @staticmethod + def _add_impersonation_credentials(credentials, pipeline_options): + if isinstance(pipeline_options, PipelineOptions): + gcs_options = pipeline_options.view_as(GoogleCloudOptions) + impersonate_service_account = gcs_options.impersonate_service_account + elif isinstance(pipeline_options, dict): + impersonate_service_account = pipeline_options.get( + 'impersonate_service_account') + else: + return credentials + if impersonate_service_account: + _LOGGER.info('Impersonating: %s', impersonate_service_account) + impersonate_accounts = impersonate_service_account.split(',') + target_principal = impersonate_accounts[-1] + delegate_to = impersonate_accounts[0:-1] + credentials = impersonated_credentials.Credentials( + source_credentials=credentials, + target_principal=target_principal, + delegates=delegate_to, + target_scopes=CLIENT_SCOPES, + ) + return credentials diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 89efa1ef6230..bb3b60273404 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -314,7 +314,7 @@ class BigQueryWrapper(object): The wrapper is used to organize all the BigQuery integration points and offer a common place where retry logic for failures can be controlled. - In addition it offers various functions used both in sources and sinks + In addition, it offers various functions used both in sources and sinks (e.g., find and create tables, query a table, etc.). """ @@ -328,7 +328,7 @@ class BigQueryWrapper(object): def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): self.client = client or bigquery.BigqueryV2( http=get_new_http(), - credentials=auth.get_service_credentials(), + credentials=auth.get_service_credentials(None), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-%s" % apache_beam.__version__ diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 90ecc1d0fcc9..11184cd34fd3 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -45,6 +45,10 @@ class GCSFileSystem(FileSystem): CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' + def __init__(self, pipeline_options): + super().__init__(pipeline_options) + self._pipeline_options = pipeline_options + @classmethod def scheme(cls): """URI scheme for the FileSystem @@ -127,12 +131,15 @@ def _list(self, dir_or_prefix): ``BeamIOError``: if listing fails, but not if no files were found. """ try: - for path, (size, updated) in gcsio.GcsIO().list_prefix( + for path, (size, updated) in self._gcsIO().list_prefix( dir_or_prefix, with_metadata=True).items(): yield FileMetadata(path, size, updated) except Exception as e: # pylint: disable=broad-except raise BeamIOError("List operation failed", {dir_or_prefix: e}) + def _gcsIO(self): + return gcsio.GcsIO(pipeline_options=self._pipeline_options) + def _path_open( self, path, @@ -143,7 +150,7 @@ def _path_open( """ compression_type = FileSystem._get_compression_type(path, compression_type) mime_type = CompressionTypes.mime_type(compression_type, mime_type) - raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type) + raw_file = self._gcsIO().open(path, mode, mime_type=mime_type) if compression_type == CompressionTypes.UNCOMPRESSED: return raw_file return CompressedFile(raw_file, compression_type=compression_type) @@ -206,9 +213,9 @@ def _copy_path(source, destination): raise ValueError('Destination %r must be GCS path.' % destination) # Use copy_tree if the path ends with / as it is a directory if source.endswith('/'): - gcsio.GcsIO().copytree(source, destination) + self._gcsIO().copytree(source, destination) else: - gcsio.GcsIO().copy(source, destination) + self._gcsIO().copy(source, destination) exceptions = {} for source, destination in zip(source_file_names, destination_file_names): @@ -249,7 +256,7 @@ def rename(self, source_file_names, destination_file_names): # Execute GCS renames if any and return exceptions. exceptions = {} for batch in gcs_batches: - copy_statuses = gcsio.GcsIO().copy_batch(batch) + copy_statuses = self._gcsIO().copy_batch(batch) copy_succeeded = [] for src, dest, exception in copy_statuses: if exception: @@ -257,7 +264,7 @@ def rename(self, source_file_names, destination_file_names): else: copy_succeeded.append((src, dest)) delete_batch = [src for src, dest in copy_succeeded] - delete_statuses = gcsio.GcsIO().delete_batch(delete_batch) + delete_statuses = self._gcsIO().delete_batch(delete_batch) for i, (src, exception) in enumerate(delete_statuses): dest = copy_succeeded[i][1] if exception: @@ -274,7 +281,7 @@ def exists(self, path): Returns: boolean flag indicating if path exists """ - return gcsio.GcsIO().exists(path) + return self._gcsIO().exists(path) def size(self, path): """Get size of path on the FileSystem. @@ -287,7 +294,7 @@ def size(self, path): Raises: ``BeamIOError``: if path doesn't exist. """ - return gcsio.GcsIO().size(path) + return self._gcsIO().size(path) def last_updated(self, path): """Get UNIX Epoch time in seconds on the FileSystem. @@ -300,7 +307,7 @@ def last_updated(self, path): Raises: ``BeamIOError``: if path doesn't exist. """ - return gcsio.GcsIO().last_updated(path) + return self._gcsIO().last_updated(path) def checksum(self, path): """Fetch checksum metadata of a file on the @@ -315,7 +322,7 @@ def checksum(self, path): ``BeamIOError``: if path isn't a file or doesn't exist. """ try: - return gcsio.GcsIO().checksum(path) + return self._gcsIO().checksum(path) except Exception as e: # pylint: disable=broad-except raise BeamIOError("Checksum operation failed", {path: e}) @@ -332,7 +339,7 @@ def metadata(self, path): ``BeamIOError``: if path isn't a file or doesn't exist. """ try: - file_metadata = gcsio.GcsIO()._status(path) + file_metadata = self._gcsIO()._status(path) return FileMetadata( path, file_metadata['size'], file_metadata['last_updated']) except Exception as e: # pylint: disable=broad-except @@ -353,7 +360,7 @@ def _delete_path(path): else: path_to_use = path match_result = self.match([path_to_use])[0] - statuses = gcsio.GcsIO().delete_batch( + statuses = self._gcsIO().delete_batch( [m.path for m in match_result.metadata_list]) # pylint: disable=used-before-assignment failures = [e for (_, e) in statuses if e is not None] diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index b4d921ada234..49b0bdc9f6cf 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -81,7 +81,7 @@ def test_split(self): def test_match_multiples(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock gcsio_mock.list_prefix.return_value = { 'gs://bucket/file1': (1, 99999.0), 'gs://bucket/file2': (2, 88888.0) } @@ -99,7 +99,7 @@ def test_match_multiples_limit(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() limit = 1 - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': (1, 99999.0)} expected_results = set([FileMetadata('gs://bucket/file1', 1, 99999.0)]) match_result = self.fs.match(['gs://bucket/'], [limit])[0] @@ -112,7 +112,7 @@ def test_match_multiples_limit(self, mock_gcsio): def test_match_multiples_error(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock exception = IOError('Failed') gcsio_mock.list_prefix.side_effect = exception @@ -128,7 +128,7 @@ def test_match_multiples_error(self, mock_gcsio): def test_match_multiple_patterns(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock gcsio_mock.list_prefix.side_effect = [ { 'gs://bucket/file1': (1, 99999.0) @@ -146,7 +146,7 @@ def test_match_multiple_patterns(self, mock_gcsio): def test_create(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock # Issue file copy _ = self.fs.create('gs://bucket/from1', 'application/octet-stream') @@ -157,7 +157,7 @@ def test_create(self, mock_gcsio): def test_open(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock # Issue file copy _ = self.fs.open('gs://bucket/from1', 'application/octet-stream') @@ -168,7 +168,7 @@ def test_open(self, mock_gcsio): def test_copy_file(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = ['gs://bucket/from1'] destinations = ['gs://bucket/to1'] @@ -182,7 +182,7 @@ def test_copy_file(self, mock_gcsio): def test_copy_file_error(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = ['gs://bucket/from1'] destinations = ['gs://bucket/to1'] @@ -208,7 +208,7 @@ def test_copy_file_error(self, mock_gcsio): def test_copy_tree(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = ['gs://bucket1/'] destinations = ['gs://bucket2/'] @@ -222,7 +222,7 @@ def test_copy_tree(self, mock_gcsio): def test_rename(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -262,7 +262,7 @@ def test_rename(self, mock_gcsio): def test_rename_error(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock sources = [ 'gs://bucket/from1', 'gs://bucket/from2', @@ -308,7 +308,7 @@ def test_rename_error(self, mock_gcsio): def test_delete(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0} files = [ 'gs://bucket/from1', @@ -324,7 +324,7 @@ def test_delete(self, mock_gcsio): def test_delete_error(self, mock_gcsio): # Prepare mocks. gcsio_mock = mock.MagicMock() - gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock exception = IOError('Failed') gcsio_mock.delete_batch.side_effect = exception gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0} diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 599861b5f778..bf41ae646107 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -138,7 +138,7 @@ def get_or_create_default_gcs_bucket(options): return None bucket_name = default_gcs_bucket_name(project, region) - bucket = GcsIO().get_bucket(bucket_name) + bucket = GcsIO(pipeline_options=options).get_bucket(bucket_name) if bucket: return bucket else: @@ -146,7 +146,8 @@ def get_or_create_default_gcs_bucket(options): 'Creating default GCS bucket for project %s: gs://%s', project, bucket_name) - return GcsIO().create_bucket(bucket_name, project, location=region) + return GcsIO(pipeline_options=options).create_bucket( + bucket_name, project, location=region) class GcsIOError(IOError, retry.PermanentException): @@ -156,10 +157,10 @@ class GcsIOError(IOError, retry.PermanentException): class GcsIO(object): """Google Cloud Storage I/O client.""" - def __init__(self, storage_client=None): + def __init__(self, storage_client=None, pipeline_options=None): if storage_client is None: storage_client = storage.StorageV1( - credentials=auth.get_service_credentials(), + credentials=auth.get_service_credentials(pipeline_options), get_credentials=False, http=get_new_http(), response_encoding='utf8', diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index a4aa2d4aa858..260090461c8c 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -461,7 +461,7 @@ def test_delete(self): @mock.patch( 'apache_beam.io.gcp.gcsio.auth.get_service_credentials', - wraps=lambda: None) + wraps=lambda pipeline_options: None) @mock.patch('apache_beam.io.gcp.gcsio.get_new_http') def test_user_agent_passed(self, get_new_http_mock, get_service_creds_mock): client = gcsio.GcsIO() diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index e02f6d799308..5aa29c0fd96e 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -748,9 +748,17 @@ def _add_argparse_args(cls, parser): '--enable_artifact_caching', default=False, action='store_true', - help= - 'When true, artifacts will be cached across job submissions in the GCS ' - 'staging bucket') + help='When true, artifacts will be cached across job submissions in ' + 'the GCS staging bucket') + parser.add_argument( + '--impersonate_service_account', + default=None, + help='All API requests will be made as the given service account or ' + 'target service account in an impersonation delegation chain ' + 'instead of the currently selected account. You can specify ' + 'either a single service account as the impersonator, or a ' + 'comma-separated list of service accounts to create an ' + 'impersonation delegation chain.') def _create_default_gcs_bucket(self): try: diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 21e3335c077e..e08729565294 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -345,6 +345,10 @@ def __init__( for k, v in sdk_pipeline_options.items() if v is not None } options_dict["pipelineUrl"] = proto_pipeline_staged_url + # Don't pass impersonate_service_account through to the harness. + # Though impersonation should start a job, the workers should + # not try to modify their credentials. + options_dict.pop('impersonate_service_account', None) self.proto.sdkPipelineOptions.additionalProperties.append( dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( key='options', value=to_json_value(options_dict))) @@ -557,7 +561,7 @@ def __init__(self, options, root_staging_location=None): if self.google_cloud_options.no_auth: credentials = None else: - credentials = get_service_credentials() + credentials = get_service_credentials(options) http_client = get_new_http() self._client = dataflow.DataflowV1b3( diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index cfc2a1a8637d..68b4fceaa8c3 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -452,7 +452,7 @@ def assert_bucket_exists(bucket_name): try: from apitools.base.py.exceptions import HttpError storage_client = storage.StorageV1( - credentials=auth.get_service_credentials(), + credentials=auth.get_service_credentials(None), get_credentials=False, http=get_new_http(), response_encoding='utf8') diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py index d06b005a4f0d..f81e015ea591 100644 --- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py +++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py @@ -209,7 +209,7 @@ def __init__(self, options): if self._google_cloud_options.no_auth: credentials = None else: - credentials = get_service_credentials() + credentials = get_service_credentials(options) self._storage_client = storage.StorageV1( url='https://www.googleapis.com/storage/v1', credentials=credentials,