Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions docs/integrations/engines/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ This creates a gateway named `bigquery` and makes it your project's default gate

It uses the [`oauth` authentication method](#authentication-methods), which does not specify a username or other information directly in the connection configuration. Other authentication methods are [described below](#authentication-methods).

In BigQuery, navigate to the dashboard and select the BigQuery project your SQLMesh project will use. From the Google Cloud dashboard, use the arrow to open the pop-up menu:
In BigQuery, navigate to the dashboard and select the BigQuery project your SQLMesh project will use. From the Google Cloud dashboard, use the arrow to open the pop-up menu:

![BigQuery Dashboard](./bigquery/bigquery-1.png)

Now we can identify the project ID needed in the `config.yaml` gateway specification above. Select the project that you want to work with, the project ID that you need to add to your yaml file is the ID label from the pop-up menu.
Now we can identify the project ID needed in the `config.yaml` gateway specification above. Select the project that you want to work with, the project ID that you need to add to your yaml file is the ID label from the pop-up menu.

![BigQuery Dashboard: selecting your project](./bigquery/bigquery-2.png)

For this guide, the Docs-Demo is the one we will use, thus the project ID for this example is `healthy-life-440919-s0`.
For this guide, the Docs-Demo is the one we will use, thus the project ID for this example is `healthy-life-440919-s0`.

## Usage

Expand Down Expand Up @@ -158,6 +158,7 @@ pip install "sqlmesh[bigquery]"
| `client_secret` | OAuth 2.0 client secret | string | N |
| `token_uri` | OAuth 2.0 authorization server's toke endpoint URI | string | N |
| `scopes` | The scopes used to obtain authorization | list | N |
| `impersonated_service_account` | The service account to impersonate | string | N |
| `job_creation_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to be created. | int | N |
| `job_execution_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to complete. | int | N |
| `job_retries` | The number of times to retry the underlying job if it fails. (Default: `1`) | int | N |
Expand Down Expand Up @@ -227,6 +228,10 @@ sqlmesh_airflow = SQLMeshAirflow(
- Related Credential Configuration:
- `keyfile_json` (Required)
- `scopes` (Optional)
- [service-account-impersonation](https://google-auth.readthedocs.io/en/latest/reference/google.auth.impersonated_credentials.html)
- Related Credential Configuration:
- `impersonated_service_account` (Required)
- `scopes` (Optional)

## Permissions Required
With any of the above connection methods, ensure these BigQuery permissions are enabled to allow SQLMesh to work correctly.
Expand Down
14 changes: 13 additions & 1 deletion sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ class BigQueryConnectionMethod(str, Enum):
OAUTH_SECRETS = "oauth-secrets"
SERVICE_ACCOUNT = "service-account"
SERVICE_ACCOUNT_JSON = "service-account-json"
SERVICE_ACCOUNT_IMPERSONATION = "service-account-impersonation"


class BigQueryPriority(str, Enum):
Expand Down Expand Up @@ -861,8 +862,9 @@ class BigQueryConnectionConfig(ConnectionConfig):
client_secret: t.Optional[str] = None
token_uri: t.Optional[str] = None
scopes: t.Tuple[str, ...] = ("https://www.googleapis.com/auth/bigquery",)
job_creation_timeout_seconds: t.Optional[int] = None
impersonated_service_account: t.Optional[str] = None
# Extra Engine Config
job_creation_timeout_seconds: t.Optional[int] = None
job_execution_timeout_seconds: t.Optional[int] = None
job_retries: t.Optional[int] = 1
job_retry_deadline_seconds: t.Optional[int] = None
Expand Down Expand Up @@ -924,6 +926,16 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
creds = service_account.Credentials.from_service_account_info(
self.keyfile_json, scopes=self.scopes
)
elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT_IMPERSONATION:
from google.auth import impersonated_credentials

default_creds, _ = google.auth.default()

creds = impersonated_credentials.Credentials(
source_credentials=default_creds,
target_principal=self.impersonated_service_account,
target_scopes=self.scopes,
)
elif self.method == BigQueryConnectionMethod.OAUTH_SECRETS:
creds = credentials.Credentials(
token=self.token,
Expand Down
6 changes: 6 additions & 0 deletions sqlmesh/dbt/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,8 @@ class BigQueryConfig(TargetConfig):
client_secret: The BigQuery client secret
token_uri: The BigQuery token URI
scopes: The BigQuery scopes
impersonated_service_account: The service account to impersonate
job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created
job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete
timeout_seconds: Alias for job_execution_timeout_seconds
job_retries: The number of times to retry the underlying job if it fails
Expand Down Expand Up @@ -536,6 +538,8 @@ class BigQueryConfig(TargetConfig):
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/drive",
)
impersonated_service_account: t.Optional[str] = None
job_creation_timeout_seconds: t.Optional[int] = None
job_execution_timeout_seconds: t.Optional[int] = None
timeout_seconds: t.Optional[int] = None # To support legacy config
job_retries: t.Optional[int] = None
Expand Down Expand Up @@ -596,6 +600,8 @@ def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
client_secret=self.client_secret,
token_uri=self.token_uri,
scopes=self.scopes,
impersonated_service_account=self.impersonated_service_account,
job_creation_timeout_seconds=self.job_creation_timeout_seconds,
job_execution_timeout_seconds=job_execution_timeout_seconds,
job_retries=job_retries,
job_retry_deadline_seconds=self.job_retry_deadline_seconds,
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ def test_plan_dlt(runner, tmp_path):
def test_init_project_dialects(tmp_path):
dialect_to_config = {
"redshift": "# concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # user: \n # password: \n # database: \n # host: \n # port: \n # source_address: \n # unix_sock: \n # ssl: \n # sslmode: \n # timeout: \n # tcp_keepalive: \n # application_name: \n # preferred_role: \n # principal_arn: \n # credentials_provider: \n # region: \n # cluster_identifier: \n # iam: \n # is_serverless: \n # serverless_acct_id: \n # serverless_work_group: \n # enable_merge: ",
"bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: ",
"bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: ",
"snowflake": "account: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # user: \n # password: \n # warehouse: \n # database: \n # role: \n # authenticator: \n # token: \n # application: Tobiko_SQLMesh\n # private_key: \n # private_key_path: \n # private_key_passphrase: \n # session_parameters: ",
"databricks": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # server_hostname: \n # http_path: \n # access_token: \n # auth_type: \n # oauth_client_id: \n # oauth_client_secret: \n # catalog: \n # http_headers: \n # session_configuration: \n # databricks_connect_server_hostname: \n # databricks_connect_access_token: \n # databricks_connect_cluster_id: \n # databricks_connect_use_serverless: False\n # force_databricks_connect: False\n # disable_databricks_connect: False\n # disable_spark_session: False",
"postgres": "host: \n user: \n password: \n port: \n database: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: True\n # pretty_sql: False\n # keepalives_idle: \n # connect_timeout: 10\n # role: \n # sslmode: ",
Expand Down