Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
command: ./.circleci/test_migration.sh sushi "--gateway duckdb_persistent"
- run:
name: Run the migration test - sushi_dbt
command: ./.circleci/test_migration.sh sushi_dbt "--config migration_test_config"
command: ./.circleci/test_migration.sh sushi_dbt "--config migration_test_config"

ui_style:
docker:
Expand Down Expand Up @@ -300,23 +300,23 @@ workflows:
name: cloud_engine_<< matrix.engine >>
context:
- sqlmesh_cloud_database_integration
requires:
- engine_tests_docker
# requires:
# - engine_tests_docker
matrix:
parameters:
engine:
- snowflake
- databricks
- redshift
# - snowflake
# - databricks
# - redshift
- bigquery
- clickhouse-cloud
- athena
- fabric
- gcp-postgres
filters:
branches:
only:
- main
# - clickhouse-cloud
# - athena
# - fabric
# - gcp-postgres
# filters:
# branches:
# only:
# - main
- ui_style
- ui_test
- vscode_test
Expand Down
107 changes: 105 additions & 2 deletions sqlmesh/core/engine_adapter/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sqlmesh.core.dialect import to_schema
from sqlmesh.core.engine_adapter.mixins import (
ClusteredByMixin,
GrantsFromInfoSchemaMixin,
RowDiffMixin,
TableAlterClusterByOperation,
)
Expand Down Expand Up @@ -39,7 +40,7 @@
from google.cloud.bigquery.table import Table as BigQueryTable

from sqlmesh.core._typing import SchemaName, SessionProperties, TableName
from sqlmesh.core.engine_adapter._typing import BigframeSession, DF, Query
from sqlmesh.core.engine_adapter._typing import BigframeSession, DCL, DF, GrantsConfig, Query
from sqlmesh.core.engine_adapter.base import QueryOrDF


Expand All @@ -54,7 +55,7 @@


@set_catalog()
class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin):
class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin, GrantsFromInfoSchemaMixin):
"""
BigQuery Engine Adapter using the `google-cloud-bigquery` library's DB API.
"""
Expand All @@ -64,6 +65,11 @@ class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin):
SUPPORTS_TRANSACTIONS = False
SUPPORTS_MATERIALIZED_VIEWS = True
SUPPORTS_CLONING = True
SUPPORTS_GRANTS = True
CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expression = exp.func("session_user")
SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True
USE_CATALOG_IN_GRANTS = True
GRANT_INFORMATION_SCHEMA_TABLE_NAME = "OBJECT_PRIVILEGES"
MAX_TABLE_COMMENT_LENGTH = 1024
MAX_COLUMN_COMMENT_LENGTH = 1024
SUPPORTS_QUERY_EXECUTION_TRACKING = True
Expand Down Expand Up @@ -1297,6 +1303,103 @@ def _session_id(self) -> t.Any:
def _session_id(self, value: t.Any) -> None:
self._connection_pool.set_attribute("session_id", value)

def _get_current_schema(self) -> str:
raise NotImplementedError("BigQuery does not support current schema")

def _get_bq_dataset_location(self, project: str, dataset: str) -> str:
return self._db_call(self.client.get_dataset, dataset_ref=f"{project}.{dataset}").location
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if dataset_ref needs to be properly quoted? Or are you assuming the strings passed in will have quotes if needed?

Copy link
Contributor Author

@newtonapple newtonapple Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I checked their API. I don't think we need to quote it.


def _get_grant_expression(self, table: exp.Table) -> exp.Expression:
if not table.db:
raise ValueError(
f"Table {table.sql(dialect=self.dialect)} does not have a schema (dataset)"
)
project = table.catalog or self.get_current_catalog()
if not project:
raise ValueError(
f"Table {table.sql(dialect=self.dialect)} does not have a catalog (project)"
)

dataset = table.db
table_name = table.name
location = self._get_bq_dataset_location(project, dataset)

# https://cloud.google.com/bigquery/docs/information-schema-object-privileges
# OBJECT_PRIVILEGES is a project-level INFORMATION_SCHEMA view with regional qualifier
object_privileges_table = exp.to_table(
f"`{project}`.`region-{location}`.INFORMATION_SCHEMA.{self.GRANT_INFORMATION_SCHEMA_TABLE_NAME}",
dialect=self.dialect,
)
return (
exp.select("privilege_type", "grantee")
.from_(object_privileges_table)
.where(
exp.and_(
exp.column("object_schema").eq(exp.Literal.string(dataset)),
exp.column("object_name").eq(exp.Literal.string(table_name)),
# Filter out current_user
# BigQuery grantees format: "user:email" or "group:name"
exp.func("split", exp.column("grantee"), exp.Literal.string(":"))[
exp.func("OFFSET", exp.Literal.number("1"))
].neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
)
)
)

@staticmethod
def _grant_object_kind(table_type: DataObjectType) -> str:
if table_type == DataObjectType.VIEW:
return "VIEW"
return "TABLE"

def _dcl_grants_config_expr(
self,
dcl_cmd: t.Type[DCL],
table: exp.Table,
grant_config: GrantsConfig,
table_type: DataObjectType = DataObjectType.TABLE,
) -> t.List[exp.Expression]:
expressions: t.List[exp.Expression] = []
if not grant_config:
return expressions

# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language

def normalize_principal(p: str) -> str:
if ":" not in p:
raise ValueError(f"Principal '{p}' missing a prefix label")

# allUsers and allAuthenticatedUsers special groups that are cas-sensitive and must start with "specialGroup:"
if p.endswith("allUsers") or p.endswith("allAuthenticatedUsers"):
if not p.startswith("specialGroup:"):
raise ValueError(
f"Special group principal '{p}' must start with 'specialGroup:' prefix label"
)
return p

label, principal = p.split(":", 1)
# always lowercase principals
return f"{label}:{principal.lower()}"

object_kind = self._grant_object_kind(table_type)
for privilege, principals in grant_config.items():
if not principals:
continue

noramlized_principals = [exp.Literal.string(normalize_principal(p)) for p in principals]
args: t.Dict[str, t.Any] = {
"privileges": [exp.GrantPrivilege(this=exp.to_identifier(privilege, quoted=True))],
"securable": table.copy(),
"principals": noramlized_principals,
}

if object_kind:
args["kind"] = exp.Var(this=object_kind)

expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]

return expressions


class _ErrorCounter:
"""
Expand Down
14 changes: 12 additions & 2 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1723,19 +1723,29 @@ def _apply_grants(
return

model_grants_target_layer = model.grants_target_layer
deployable_vde_dev_only = (
is_snapshot_deployable and model.virtual_environment_mode.is_dev_only
)

# table_type is always a VIEW in the virtual layer unless model is deployable and VDE is dev_only
# in which case we fall back to the model's model_grants_table_type
if target_layer == GrantsTargetLayer.VIRTUAL and not deployable_vde_dev_only:
model_grants_table_type = DataObjectType.VIEW
else:
model_grants_table_type = model.grants_table_type

if (
model_grants_target_layer.is_all
or model_grants_target_layer == target_layer
# Always apply grants in production when VDE is dev_only regardless of target_layer
# since only physical tables are created in production
or (is_snapshot_deployable and model.virtual_environment_mode.is_dev_only)
or deployable_vde_dev_only
):
logger.info(f"Applying grants for model {model.name} to table {table_name}")
self.adapter.sync_grants_config(
exp.to_table(table_name, dialect=self.adapter.dialect),
grants_config,
model.grants_table_type,
model_grants_table_type,
)
else:
logger.debug(
Expand Down
29 changes: 27 additions & 2 deletions tests/core/engine_adapter/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,21 @@ def _get_create_user_or_role(
# Creating an account-level group in Databricks requires making REST API calls so we are going to
# use a pre-created group instead. We assume the suffix on the name is the unique id
return "_".join(username.split("_")[:-1]), None
if self.dialect == "bigquery":
# BigQuery uses IAM service accounts that need to be pre-created
# Pre-created GCP service accounts:
# - sqlmesh-test-admin@{project-id}.iam.gserviceaccount.com
# - sqlmesh-test-analyst@{project-id}.iam.gserviceaccount.com
# - sqlmesh-test-etl-user@{project-id}.iam.gserviceaccount.com
# - sqlmesh-test-reader@{project-id}.iam.gserviceaccount.com
# - sqlmesh-test-user@{project-id}.iam.gserviceaccount.com
# - sqlmesh-test-writer@{project-id}.iam.gserviceaccount.com
role_name = (
username.replace(f"_{self.test_id}", "").replace("test_", "").replace("_", "-")
)
project_id = self.engine_adapter.get_current_catalog()
service_account = f"sqlmesh-test-{role_name}@{project_id}.iam.gserviceaccount.com"
return f"serviceAccount:{service_account}", None
raise ValueError(f"User creation not supported for dialect: {self.dialect}")

def _create_user_or_role(self, username: str, password: t.Optional[str] = None) -> str:
Expand Down Expand Up @@ -791,20 +806,29 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]
for user_name in created_users:
self._cleanup_user_or_role(user_name)

def get_select_privilege(self) -> str:
if self.dialect == "bigquery":
return "roles/bigquery.dataViewer"
return "SELECT"

def get_insert_privilege(self) -> str:
if self.dialect == "databricks":
# This would really be "MODIFY" but for the purposes of having this be unique from UPDATE
# we return "MANAGE" instead
return "MANAGE"
if self.dialect == "bigquery":
return "roles/bigquery.dataEditor"
return "INSERT"

def get_update_privilege(self) -> str:
if self.dialect == "databricks":
return "MODIFY"
if self.dialect == "bigquery":
return "roles/bigquery.dataOwner"
return "UPDATE"

def _cleanup_user_or_role(self, user_name: str) -> None:
"""Helper function to clean up a PostgreSQL user and all their dependencies."""
"""Helper function to clean up a user/role and all their dependencies."""
try:
if self.dialect in ["postgres", "redshift"]:
self.engine_adapter.execute(f"""
Expand All @@ -816,7 +840,8 @@ def _cleanup_user_or_role(self, user_name: str) -> None:
self.engine_adapter.execute(f'DROP USER IF EXISTS "{user_name}"')
elif self.dialect == "snowflake":
self.engine_adapter.execute(f"DROP ROLE IF EXISTS {user_name}")
elif self.dialect == "databricks":
elif self.dialect in ["databricks", "bigquery"]:
# For Databricks and BigQuery, we use pre-created accounts that should not be deleted
pass
except Exception:
pass
Expand Down
Loading