Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions docs/integrations/engines/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ pip install "sqlmesh[bigquery]"
| `refresh_token` | OAuth 2.0 refresh token | string | N |
| `client_id` | OAuth 2.0 client ID | string | N |
| `client_secret` | OAuth 2.0 client secret | string | N |
| `token_uri` | OAuth 2.0 authorization server's toke endpoint URI | string | N |
| `token_uri` | OAuth 2.0 authorization server's token endpoint URI | string | N |
| `scopes` | The scopes used to obtain authorization | list | N |
| `impersonated_service_account` | The service account to impersonate | string | N |
| `impersonated_service_account` | If set, SQLMesh will attempt to impersonate this service account | string | N |
| `job_creation_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to be created. | int | N |
| `job_execution_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to complete. | int | N |
| `job_retries` | The number of times to retry the underlying job if it fails. (Default: `1`) | int | N |
Expand Down Expand Up @@ -228,10 +228,13 @@ sqlmesh_airflow = SQLMeshAirflow(
- Related Credential Configuration:
- `keyfile_json` (Required)
- `scopes` (Optional)
- [service-account-impersonation](https://google-auth.readthedocs.io/en/latest/reference/google.auth.impersonated_credentials.html)
- Related Credential Configuration:
- `impersonated_service_account` (Required)
- `scopes` (Optional)

If the `impersonated_service_account` argument is set, SQLMesh will:

1. Authenticate user account credentials with one of the methods above
2. Attempt to impersonate the service account with those credentials

The user account must have [sufficient permissions to impersonate the service account](https://cloud.google.com/docs/authentication/use-service-account-impersonation).

## Permissions Required
With any of the above connection methods, ensure these BigQuery permissions are enabled to allow SQLMesh to work correctly.
Expand Down
19 changes: 8 additions & 11 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,6 @@ class BigQueryConnectionMethod(str, Enum):
OAUTH_SECRETS = "oauth-secrets"
SERVICE_ACCOUNT = "service-account"
SERVICE_ACCOUNT_JSON = "service-account-json"
SERVICE_ACCOUNT_IMPERSONATION = "service-account-impersonation"


class BigQueryPriority(str, Enum):
Expand Down Expand Up @@ -913,6 +912,7 @@ def _engine_adapter(self) -> t.Type[EngineAdapter]:
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
"""The static connection kwargs for this connection"""
import google.auth
from google.auth import impersonated_credentials
from google.api_core import client_info, client_options
from google.oauth2 import credentials, service_account

Expand All @@ -926,16 +926,6 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
creds = service_account.Credentials.from_service_account_info(
self.keyfile_json, scopes=self.scopes
)
elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT_IMPERSONATION:
from google.auth import impersonated_credentials

default_creds, _ = google.auth.default()

creds = impersonated_credentials.Credentials(
source_credentials=default_creds,
target_principal=self.impersonated_service_account,
target_scopes=self.scopes,
)
elif self.method == BigQueryConnectionMethod.OAUTH_SECRETS:
creds = credentials.Credentials(
token=self.token,
Expand All @@ -948,6 +938,13 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
else:
raise ConfigError("Invalid BigQuery Connection Method")

if self.impersonated_service_account:
creds = impersonated_credentials.Credentials(
source_credentials=creds,
target_principal=self.impersonated_service_account,
target_scopes=self.scopes,
)

options = client_options.ClientOptions(quota_project_id=self.quota_project)
project = self.execution_project or self.project or None

Expand Down