diff --git a/docs/integrations/engines/bigquery.md b/docs/integrations/engines/bigquery.md index 2d776a42e7..4ab0918f93 100644 --- a/docs/integrations/engines/bigquery.md +++ b/docs/integrations/engines/bigquery.md @@ -71,15 +71,15 @@ This creates a gateway named `bigquery` and makes it your project's default gate It uses the [`oauth` authentication method](#authentication-methods), which does not specify a username or other information directly in the connection configuration. Other authentication methods are [described below](#authentication-methods). -In BigQuery, navigate to the dashboard and select the BigQuery project your SQLMesh project will use. From the Google Cloud dashboard, use the arrow to open the pop-up menu: +In BigQuery, navigate to the dashboard and select the BigQuery project your SQLMesh project will use. From the Google Cloud dashboard, use the arrow to open the pop-up menu: ![BigQuery Dashboard](./bigquery/bigquery-1.png) -Now we can identify the project ID needed in the `config.yaml` gateway specification above. Select the project that you want to work with, the project ID that you need to add to your yaml file is the ID label from the pop-up menu. +Now we can identify the project ID needed in the `config.yaml` gateway specification above. Select the project that you want to work with, the project ID that you need to add to your yaml file is the ID label from the pop-up menu. ![BigQuery Dashboard: selecting your project](./bigquery/bigquery-2.png) -For this guide, the Docs-Demo is the one we will use, thus the project ID for this example is `healthy-life-440919-s0`. +For this guide, the Docs-Demo is the one we will use, thus the project ID for this example is `healthy-life-440919-s0`. ## Usage @@ -158,6 +158,7 @@ pip install "sqlmesh[bigquery]" | `client_secret` | OAuth 2.0 client secret | string | N | | `token_uri` | OAuth 2.0 authorization server's toke endpoint URI | string | N | | `scopes` | The scopes used to obtain authorization | list | N | +| `impersonated_service_account` | The service account to impersonate | string | N | | `job_creation_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to be created. | int | N | | `job_execution_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to complete. | int | N | | `job_retries` | The number of times to retry the underlying job if it fails. (Default: `1`) | int | N | @@ -227,6 +228,10 @@ sqlmesh_airflow = SQLMeshAirflow( - Related Credential Configuration: - `keyfile_json` (Required) - `scopes` (Optional) +- [service-account-impersonation](https://google-auth.readthedocs.io/en/latest/reference/google.auth.impersonated_credentials.html) + - Related Credential Configuration: + - `impersonated_service_account` (Required) + - `scopes` (Optional) ## Permissions Required With any of the above connection methods, ensure these BigQuery permissions are enabled to allow SQLMesh to work correctly. diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 9e19d825f4..426cba9867 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -817,6 +817,7 @@ class BigQueryConnectionMethod(str, Enum): OAUTH_SECRETS = "oauth-secrets" SERVICE_ACCOUNT = "service-account" SERVICE_ACCOUNT_JSON = "service-account-json" + SERVICE_ACCOUNT_IMPERSONATION = "service-account-impersonation" class BigQueryPriority(str, Enum): @@ -861,8 +862,9 @@ class BigQueryConnectionConfig(ConnectionConfig): client_secret: t.Optional[str] = None token_uri: t.Optional[str] = None scopes: t.Tuple[str, ...] = ("https://www.googleapis.com/auth/bigquery",) - job_creation_timeout_seconds: t.Optional[int] = None + impersonated_service_account: t.Optional[str] = None # Extra Engine Config + job_creation_timeout_seconds: t.Optional[int] = None job_execution_timeout_seconds: t.Optional[int] = None job_retries: t.Optional[int] = 1 job_retry_deadline_seconds: t.Optional[int] = None @@ -924,6 +926,16 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: creds = service_account.Credentials.from_service_account_info( self.keyfile_json, scopes=self.scopes ) + elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT_IMPERSONATION: + from google.auth import impersonated_credentials + + default_creds, _ = google.auth.default() + + creds = impersonated_credentials.Credentials( + source_credentials=default_creds, + target_principal=self.impersonated_service_account, + target_scopes=self.scopes, + ) elif self.method == BigQueryConnectionMethod.OAUTH_SECRETS: creds = credentials.Credentials( token=self.token, diff --git a/sqlmesh/dbt/target.py b/sqlmesh/dbt/target.py index 5b3814a68f..7775888a9e 100644 --- a/sqlmesh/dbt/target.py +++ b/sqlmesh/dbt/target.py @@ -508,6 +508,8 @@ class BigQueryConfig(TargetConfig): client_secret: The BigQuery client secret token_uri: The BigQuery token URI scopes: The BigQuery scopes + impersonated_service_account: The service account to impersonate + job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete timeout_seconds: Alias for job_execution_timeout_seconds job_retries: The number of times to retry the underlying job if it fails @@ -536,6 +538,8 @@ class BigQueryConfig(TargetConfig): "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/drive", ) + impersonated_service_account: t.Optional[str] = None + job_creation_timeout_seconds: t.Optional[int] = None job_execution_timeout_seconds: t.Optional[int] = None timeout_seconds: t.Optional[int] = None # To support legacy config job_retries: t.Optional[int] = None @@ -596,6 +600,8 @@ def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig: client_secret=self.client_secret, token_uri=self.token_uri, scopes=self.scopes, + impersonated_service_account=self.impersonated_service_account, + job_creation_timeout_seconds=self.job_creation_timeout_seconds, job_execution_timeout_seconds=job_execution_timeout_seconds, job_retries=job_retries, job_retry_deadline_seconds=self.job_retry_deadline_seconds, diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index edc9a013f6..76dbc812ae 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -952,7 +952,7 @@ def test_plan_dlt(runner, tmp_path): def test_init_project_dialects(tmp_path): dialect_to_config = { "redshift": "# concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # user: \n # password: \n # database: \n # host: \n # port: \n # source_address: \n # unix_sock: \n # ssl: \n # sslmode: \n # timeout: \n # tcp_keepalive: \n # application_name: \n # preferred_role: \n # principal_arn: \n # credentials_provider: \n # region: \n # cluster_identifier: \n # iam: \n # is_serverless: \n # serverless_acct_id: \n # serverless_work_group: \n # enable_merge: ", - "bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: ", + "bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: ", "snowflake": "account: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # user: \n # password: \n # warehouse: \n # database: \n # role: \n # authenticator: \n # token: \n # application: Tobiko_SQLMesh\n # private_key: \n # private_key_path: \n # private_key_passphrase: \n # session_parameters: ", "databricks": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # server_hostname: \n # http_path: \n # access_token: \n # auth_type: \n # oauth_client_id: \n # oauth_client_secret: \n # catalog: \n # http_headers: \n # session_configuration: \n # databricks_connect_server_hostname: \n # databricks_connect_access_token: \n # databricks_connect_cluster_id: \n # databricks_connect_use_serverless: False\n # force_databricks_connect: False\n # disable_databricks_connect: False\n # disable_spark_session: False", "postgres": "host: \n user: \n password: \n port: \n database: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: True\n # pretty_sql: False\n # keepalives_idle: \n # connect_timeout: 10\n # role: \n # sslmode: ",