From 36bb43a1943d69e352cd05066133dc210dbfb83f Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Wed, 5 Mar 2025 12:27:14 -0600 Subject: [PATCH 1/2] Separate BQ user auth from account impersonation --- sqlmesh/core/config/connection.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 426cba9867..3191fb2430 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -817,7 +817,6 @@ class BigQueryConnectionMethod(str, Enum): OAUTH_SECRETS = "oauth-secrets" SERVICE_ACCOUNT = "service-account" SERVICE_ACCOUNT_JSON = "service-account-json" - SERVICE_ACCOUNT_IMPERSONATION = "service-account-impersonation" class BigQueryPriority(str, Enum): @@ -926,16 +925,6 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: creds = service_account.Credentials.from_service_account_info( self.keyfile_json, scopes=self.scopes ) - elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT_IMPERSONATION: - from google.auth import impersonated_credentials - - default_creds, _ = google.auth.default() - - creds = impersonated_credentials.Credentials( - source_credentials=default_creds, - target_principal=self.impersonated_service_account, - target_scopes=self.scopes, - ) elif self.method == BigQueryConnectionMethod.OAUTH_SECRETS: creds = credentials.Credentials( token=self.token, @@ -948,6 +937,15 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: else: raise ConfigError("Invalid BigQuery Connection Method") + if self.impersonated_service_account: + from google.auth import impersonated_credentials + + creds = impersonated_credentials.Credentials( + source_credentials=creds, + target_principal=self.impersonated_service_account, + target_scopes=self.scopes, + ) + options = client_options.ClientOptions(quota_project_id=self.quota_project) project = self.execution_project or self.project or None From e9495f40ac0a8f8582678f31f8612372a6fd2ba4 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Wed, 5 Mar 2025 15:20:25 -0600 Subject: [PATCH 2/2] Move import --- docs/integrations/engines/bigquery.md | 15 +++++++++------ sqlmesh/core/config/connection.py | 3 +-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/integrations/engines/bigquery.md b/docs/integrations/engines/bigquery.md index 4ab0918f93..d6bcbea616 100644 --- a/docs/integrations/engines/bigquery.md +++ b/docs/integrations/engines/bigquery.md @@ -156,9 +156,9 @@ pip install "sqlmesh[bigquery]" | `refresh_token` | OAuth 2.0 refresh token | string | N | | `client_id` | OAuth 2.0 client ID | string | N | | `client_secret` | OAuth 2.0 client secret | string | N | -| `token_uri` | OAuth 2.0 authorization server's toke endpoint URI | string | N | +| `token_uri` | OAuth 2.0 authorization server's token endpoint URI | string | N | | `scopes` | The scopes used to obtain authorization | list | N | -| `impersonated_service_account` | The service account to impersonate | string | N | +| `impersonated_service_account` | If set, SQLMesh will attempt to impersonate this service account | string | N | | `job_creation_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to be created. | int | N | | `job_execution_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to complete. | int | N | | `job_retries` | The number of times to retry the underlying job if it fails. (Default: `1`) | int | N | @@ -228,10 +228,13 @@ sqlmesh_airflow = SQLMeshAirflow( - Related Credential Configuration: - `keyfile_json` (Required) - `scopes` (Optional) -- [service-account-impersonation](https://google-auth.readthedocs.io/en/latest/reference/google.auth.impersonated_credentials.html) - - Related Credential Configuration: - - `impersonated_service_account` (Required) - - `scopes` (Optional) + +If the `impersonated_service_account` argument is set, SQLMesh will: + +1. Authenticate user account credentials with one of the methods above +2. Attempt to impersonate the service account with those credentials + +The user account must have [sufficient permissions to impersonate the service account](https://cloud.google.com/docs/authentication/use-service-account-impersonation). ## Permissions Required With any of the above connection methods, ensure these BigQuery permissions are enabled to allow SQLMesh to work correctly. diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 3191fb2430..133ee3311a 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -912,6 +912,7 @@ def _engine_adapter(self) -> t.Type[EngineAdapter]: def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: """The static connection kwargs for this connection""" import google.auth + from google.auth import impersonated_credentials from google.api_core import client_info, client_options from google.oauth2 import credentials, service_account @@ -938,8 +939,6 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: raise ConfigError("Invalid BigQuery Connection Method") if self.impersonated_service_account: - from google.auth import impersonated_credentials - creds = impersonated_credentials.Credentials( source_credentials=creds, target_principal=self.impersonated_service_account,