diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index c549c0ae78..9452eeb6d2 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -148,7 +148,7 @@ jobs: command: ./.circleci/test_migration.sh sushi "--gateway duckdb_persistent" - run: name: Run the migration test - sushi_dbt - command: ./.circleci/test_migration.sh sushi_dbt "--config migration_test_config" + command: ./.circleci/test_migration.sh sushi_dbt "--config migration_test_config" ui_style: docker: @@ -300,23 +300,23 @@ workflows: name: cloud_engine_<< matrix.engine >> context: - sqlmesh_cloud_database_integration - requires: - - engine_tests_docker + # requires: + # - engine_tests_docker matrix: parameters: engine: - - snowflake - - databricks - - redshift + # - snowflake + # - databricks + # - redshift - bigquery - - clickhouse-cloud - - athena - - fabric - - gcp-postgres - filters: - branches: - only: - - main + # - clickhouse-cloud + # - athena + # - fabric + # - gcp-postgres + # filters: + # branches: + # only: + # - main - ui_style - ui_test - vscode_test diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 0dfa2325e8..cc89941605 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -10,6 +10,7 @@ from sqlmesh.core.dialect import to_schema from sqlmesh.core.engine_adapter.mixins import ( ClusteredByMixin, + GrantsFromInfoSchemaMixin, RowDiffMixin, TableAlterClusterByOperation, ) @@ -39,7 +40,7 @@ from google.cloud.bigquery.table import Table as BigQueryTable from sqlmesh.core._typing import SchemaName, SessionProperties, TableName - from sqlmesh.core.engine_adapter._typing import BigframeSession, DF, Query + from sqlmesh.core.engine_adapter._typing import BigframeSession, DCL, DF, GrantsConfig, Query from sqlmesh.core.engine_adapter.base import QueryOrDF @@ -54,7 +55,7 @@ @set_catalog() -class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin): +class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin, GrantsFromInfoSchemaMixin): """ BigQuery Engine Adapter using the `google-cloud-bigquery` library's DB API. """ @@ -64,6 +65,11 @@ class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin): SUPPORTS_TRANSACTIONS = False SUPPORTS_MATERIALIZED_VIEWS = True SUPPORTS_CLONING = True + SUPPORTS_GRANTS = True + CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expression = exp.func("session_user") + SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True + USE_CATALOG_IN_GRANTS = True + GRANT_INFORMATION_SCHEMA_TABLE_NAME = "OBJECT_PRIVILEGES" MAX_TABLE_COMMENT_LENGTH = 1024 MAX_COLUMN_COMMENT_LENGTH = 1024 SUPPORTS_QUERY_EXECUTION_TRACKING = True @@ -1297,6 +1303,103 @@ def _session_id(self) -> t.Any: def _session_id(self, value: t.Any) -> None: self._connection_pool.set_attribute("session_id", value) + def _get_current_schema(self) -> str: + raise NotImplementedError("BigQuery does not support current schema") + + def _get_bq_dataset_location(self, project: str, dataset: str) -> str: + return self._db_call(self.client.get_dataset, dataset_ref=f"{project}.{dataset}").location + + def _get_grant_expression(self, table: exp.Table) -> exp.Expression: + if not table.db: + raise ValueError( + f"Table {table.sql(dialect=self.dialect)} does not have a schema (dataset)" + ) + project = table.catalog or self.get_current_catalog() + if not project: + raise ValueError( + f"Table {table.sql(dialect=self.dialect)} does not have a catalog (project)" + ) + + dataset = table.db + table_name = table.name + location = self._get_bq_dataset_location(project, dataset) + + # https://cloud.google.com/bigquery/docs/information-schema-object-privileges + # OBJECT_PRIVILEGES is a project-level INFORMATION_SCHEMA view with regional qualifier + object_privileges_table = exp.to_table( + f"`{project}`.`region-{location}`.INFORMATION_SCHEMA.{self.GRANT_INFORMATION_SCHEMA_TABLE_NAME}", + dialect=self.dialect, + ) + return ( + exp.select("privilege_type", "grantee") + .from_(object_privileges_table) + .where( + exp.and_( + exp.column("object_schema").eq(exp.Literal.string(dataset)), + exp.column("object_name").eq(exp.Literal.string(table_name)), + # Filter out current_user + # BigQuery grantees format: "user:email" or "group:name" + exp.func("split", exp.column("grantee"), exp.Literal.string(":"))[ + exp.func("OFFSET", exp.Literal.number("1")) + ].neq(self.CURRENT_USER_OR_ROLE_EXPRESSION), + ) + ) + ) + + @staticmethod + def _grant_object_kind(table_type: DataObjectType) -> str: + if table_type == DataObjectType.VIEW: + return "VIEW" + return "TABLE" + + def _dcl_grants_config_expr( + self, + dcl_cmd: t.Type[DCL], + table: exp.Table, + grant_config: GrantsConfig, + table_type: DataObjectType = DataObjectType.TABLE, + ) -> t.List[exp.Expression]: + expressions: t.List[exp.Expression] = [] + if not grant_config: + return expressions + + # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language + + def normalize_principal(p: str) -> str: + if ":" not in p: + raise ValueError(f"Principal '{p}' missing a prefix label") + + # allUsers and allAuthenticatedUsers special groups that are cas-sensitive and must start with "specialGroup:" + if p.endswith("allUsers") or p.endswith("allAuthenticatedUsers"): + if not p.startswith("specialGroup:"): + raise ValueError( + f"Special group principal '{p}' must start with 'specialGroup:' prefix label" + ) + return p + + label, principal = p.split(":", 1) + # always lowercase principals + return f"{label}:{principal.lower()}" + + object_kind = self._grant_object_kind(table_type) + for privilege, principals in grant_config.items(): + if not principals: + continue + + noramlized_principals = [exp.Literal.string(normalize_principal(p)) for p in principals] + args: t.Dict[str, t.Any] = { + "privileges": [exp.GrantPrivilege(this=exp.to_identifier(privilege, quoted=True))], + "securable": table.copy(), + "principals": noramlized_principals, + } + + if object_kind: + args["kind"] = exp.Var(this=object_kind) + + expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] + + return expressions + class _ErrorCounter: """ diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 107b93cb43..75f9204f63 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1723,19 +1723,29 @@ def _apply_grants( return model_grants_target_layer = model.grants_target_layer + deployable_vde_dev_only = ( + is_snapshot_deployable and model.virtual_environment_mode.is_dev_only + ) + + # table_type is always a VIEW in the virtual layer unless model is deployable and VDE is dev_only + # in which case we fall back to the model's model_grants_table_type + if target_layer == GrantsTargetLayer.VIRTUAL and not deployable_vde_dev_only: + model_grants_table_type = DataObjectType.VIEW + else: + model_grants_table_type = model.grants_table_type if ( model_grants_target_layer.is_all or model_grants_target_layer == target_layer # Always apply grants in production when VDE is dev_only regardless of target_layer # since only physical tables are created in production - or (is_snapshot_deployable and model.virtual_environment_mode.is_dev_only) + or deployable_vde_dev_only ): logger.info(f"Applying grants for model {model.name} to table {table_name}") self.adapter.sync_grants_config( exp.to_table(table_name, dialect=self.adapter.dialect), grants_config, - model.grants_table_type, + model_grants_table_type, ) else: logger.debug( diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index 48aa8e35ea..49624154e4 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -758,6 +758,21 @@ def _get_create_user_or_role( # Creating an account-level group in Databricks requires making REST API calls so we are going to # use a pre-created group instead. We assume the suffix on the name is the unique id return "_".join(username.split("_")[:-1]), None + if self.dialect == "bigquery": + # BigQuery uses IAM service accounts that need to be pre-created + # Pre-created GCP service accounts: + # - sqlmesh-test-admin@{project-id}.iam.gserviceaccount.com + # - sqlmesh-test-analyst@{project-id}.iam.gserviceaccount.com + # - sqlmesh-test-etl-user@{project-id}.iam.gserviceaccount.com + # - sqlmesh-test-reader@{project-id}.iam.gserviceaccount.com + # - sqlmesh-test-user@{project-id}.iam.gserviceaccount.com + # - sqlmesh-test-writer@{project-id}.iam.gserviceaccount.com + role_name = ( + username.replace(f"_{self.test_id}", "").replace("test_", "").replace("_", "-") + ) + project_id = self.engine_adapter.get_current_catalog() + service_account = f"sqlmesh-test-{role_name}@{project_id}.iam.gserviceaccount.com" + return f"serviceAccount:{service_account}", None raise ValueError(f"User creation not supported for dialect: {self.dialect}") def _create_user_or_role(self, username: str, password: t.Optional[str] = None) -> str: @@ -791,20 +806,29 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str] for user_name in created_users: self._cleanup_user_or_role(user_name) + def get_select_privilege(self) -> str: + if self.dialect == "bigquery": + return "roles/bigquery.dataViewer" + return "SELECT" + def get_insert_privilege(self) -> str: if self.dialect == "databricks": # This would really be "MODIFY" but for the purposes of having this be unique from UPDATE # we return "MANAGE" instead return "MANAGE" + if self.dialect == "bigquery": + return "roles/bigquery.dataEditor" return "INSERT" def get_update_privilege(self) -> str: if self.dialect == "databricks": return "MODIFY" + if self.dialect == "bigquery": + return "roles/bigquery.dataOwner" return "UPDATE" def _cleanup_user_or_role(self, user_name: str) -> None: - """Helper function to clean up a PostgreSQL user and all their dependencies.""" + """Helper function to clean up a user/role and all their dependencies.""" try: if self.dialect in ["postgres", "redshift"]: self.engine_adapter.execute(f""" @@ -816,7 +840,8 @@ def _cleanup_user_or_role(self, user_name: str) -> None: self.engine_adapter.execute(f'DROP USER IF EXISTS "{user_name}"') elif self.dialect == "snowflake": self.engine_adapter.execute(f"DROP ROLE IF EXISTS {user_name}") - elif self.dialect == "databricks": + elif self.dialect in ["databricks", "bigquery"]: + # For Databricks and BigQuery, we use pre-created accounts that should not be deleted pass except Exception: pass diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 3b3ac12a26..4720c4cba8 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -3843,29 +3843,30 @@ def test_sync_grants_config(ctx: TestContext) -> None: ) table = ctx.table("sync_grants_integration") + select_privilege = ctx.get_select_privilege() insert_privilege = ctx.get_insert_privilege() update_privilege = ctx.get_update_privilege() with ctx.create_users_or_roles("reader", "writer", "admin") as roles: ctx.engine_adapter.create_table(table, {"id": exp.DataType.build("INT")}) initial_grants = { - "SELECT": [roles["reader"]], + select_privilege: [roles["reader"]], insert_privilege: [roles["writer"]], } ctx.engine_adapter.sync_grants_config(table, initial_grants) current_grants = ctx.engine_adapter._get_current_grants_config(table) - assert set(current_grants.get("SELECT", [])) == {roles["reader"]} + assert set(current_grants.get(select_privilege, [])) == {roles["reader"]} assert set(current_grants.get(insert_privilege, [])) == {roles["writer"]} target_grants = { - "SELECT": [roles["writer"], roles["admin"]], + select_privilege: [roles["writer"], roles["admin"]], update_privilege: [roles["admin"]], } ctx.engine_adapter.sync_grants_config(table, target_grants) synced_grants = ctx.engine_adapter._get_current_grants_config(table) - assert set(synced_grants.get("SELECT", [])) == { + assert set(synced_grants.get(select_privilege, [])) == { roles["writer"], roles["admin"], } @@ -3880,18 +3881,19 @@ def test_grants_sync_empty_config(ctx: TestContext): ) table = ctx.table("grants_empty_test") + select_privilege = ctx.get_select_privilege() insert_privilege = ctx.get_insert_privilege() with ctx.create_users_or_roles("user") as roles: ctx.engine_adapter.create_table(table, {"id": exp.DataType.build("INT")}) initial_grants = { - "SELECT": [roles["user"]], + select_privilege: [roles["user"]], insert_privilege: [roles["user"]], } ctx.engine_adapter.sync_grants_config(table, initial_grants) initial_current_grants = ctx.engine_adapter._get_current_grants_config(table) - assert roles["user"] in initial_current_grants.get("SELECT", []) + assert roles["user"] in initial_current_grants.get(select_privilege, []) assert roles["user"] in initial_current_grants.get(insert_privilege, []) ctx.engine_adapter.sync_grants_config(table, {}) @@ -3912,22 +3914,30 @@ def test_grants_case_insensitive_grantees(ctx: TestContext): reader = roles["reader"] writer = roles["writer"] + select_privilege = ctx.get_select_privilege() - grants_config = {"SELECT": [reader, writer.upper()]} + if ctx.dialect == "bigquery": + # BigQuery labels are case sensitive, e.g. serviceAccount + lablel, grantee = writer.split(":", 1) + upper_case_writer = f"{lablel}:{grantee.upper()}" + else: + upper_case_writer = writer.upper() + + grants_config = {select_privilege: [reader, upper_case_writer]} ctx.engine_adapter.sync_grants_config(table, grants_config) # Grantees are still in lowercase current_grants = ctx.engine_adapter._get_current_grants_config(table) - assert reader in current_grants.get("SELECT", []) - assert writer in current_grants.get("SELECT", []) + assert reader in current_grants.get(select_privilege, []) + assert writer in current_grants.get(select_privilege, []) # Revoke writer - grants_config = {"SELECT": [reader.upper()]} + grants_config = {select_privilege: [reader.upper()]} ctx.engine_adapter.sync_grants_config(table, grants_config) current_grants = ctx.engine_adapter._get_current_grants_config(table) - assert reader in current_grants.get("SELECT", []) - assert writer not in current_grants.get("SELECT", []) + assert reader in current_grants.get(select_privilege, []) + assert writer not in current_grants.get(select_privilege, []) def test_grants_plan(ctx: TestContext, tmp_path: Path): @@ -3937,6 +3947,7 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path): ) table = ctx.table("grant_model").sql(dialect="duckdb") + select_privilege = ctx.get_select_privilege() insert_privilege = ctx.get_insert_privilege() with ctx.create_users_or_roles("analyst", "etl_user") as roles: (tmp_path / "models").mkdir(exist_ok=True) @@ -3946,7 +3957,7 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path): name {table}, kind FULL, grants ( - 'select' = ['{roles["analyst"]}'] + '{select_privilege}' = ['{roles["analyst"]}'] ), grants_target_layer 'all' ); @@ -3969,13 +3980,13 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path): current_grants = ctx.engine_adapter._get_current_grants_config( exp.to_table(table_name, dialect=ctx.dialect) ) - assert current_grants == {"SELECT": [roles["analyst"]]} + assert current_grants == {select_privilege: [roles["analyst"]]} # Virtual layer (view) w/ grants virtual_grants = ctx.engine_adapter._get_current_grants_config( exp.to_table(view_name, dialect=ctx.dialect) ) - assert virtual_grants == {"SELECT": [roles["analyst"]]} + assert virtual_grants == {select_privilege: [roles["analyst"]]} # Update model with query change and new grants updated_model = load_sql_based_model( @@ -3985,7 +3996,7 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path): name {table}, kind FULL, grants ( - 'select' = ['{roles["analyst"]}', '{roles["etl_user"]}'], + '{select_privilege}' = ['{roles["analyst"]}', '{roles["etl_user"]}'], '{insert_privilege}' = ['{roles["etl_user"]}'] ), grants_target_layer 'all' @@ -4010,17 +4021,21 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path): exp.to_table(new_table_name, dialect=ctx.dialect) ) expected_final_grants = { - "SELECT": [roles["analyst"], roles["etl_user"]], + select_privilege: [roles["analyst"], roles["etl_user"]], insert_privilege: [roles["etl_user"]], } - assert set(final_grants.get("SELECT", [])) == set(expected_final_grants["SELECT"]) + assert set(final_grants.get(select_privilege, [])) == set( + expected_final_grants[select_privilege] + ) assert final_grants.get(insert_privilege, []) == expected_final_grants[insert_privilege] # Virtual layer should also have the updated grants updated_virtual_grants = ctx.engine_adapter._get_current_grants_config( exp.to_table(view_name, dialect=ctx.dialect) ) - assert set(updated_virtual_grants.get("SELECT", [])) == set(expected_final_grants["SELECT"]) + assert set(updated_virtual_grants.get(select_privilege, [])) == set( + expected_final_grants[select_privilege] + ) assert ( updated_virtual_grants.get(insert_privilege, []) == expected_final_grants[insert_privilege] diff --git a/tests/core/engine_adapter/test_bigquery.py b/tests/core/engine_adapter/test_bigquery.py index f195bbaa2a..7f07f6df1f 100644 --- a/tests/core/engine_adapter/test_bigquery.py +++ b/tests/core/engine_adapter/test_bigquery.py @@ -13,6 +13,7 @@ import sqlmesh.core.dialect as d from sqlmesh.core.engine_adapter import BigQueryEngineAdapter from sqlmesh.core.engine_adapter.bigquery import select_partitions_expr +from sqlmesh.core.engine_adapter.shared import DataObjectType from sqlmesh.core.node import IntervalUnit from sqlmesh.utils import AttributeDict from sqlmesh.utils.errors import SQLMeshError @@ -588,13 +589,14 @@ def _to_sql_calls(execute_mock: t.Any, identify: bool = True) -> t.List[str]: execute_mock = execute_mock.execute output = [] for call in execute_mock.call_args_list: - value = call[0][0] - sql = ( - value.sql(dialect="bigquery", identify=identify) - if isinstance(value, exp.Expression) - else str(value) - ) - output.append(sql) + values = ensure_list(call[0][0]) + for value in values: + sql = ( + value.sql(dialect="bigquery", identify=identify) + if isinstance(value, exp.Expression) + else str(value) + ) + output.append(sql) return output @@ -1213,3 +1215,168 @@ def test_scd_type_2_by_partitioning(adapter: BigQueryEngineAdapter): # Both calls should contain the partition logic (the scd logic is already covered by other tests) assert "PARTITION BY TIMESTAMP_TRUNC(`valid_from`, DAY)" in calls[0] assert "PARTITION BY TIMESTAMP_TRUNC(`valid_from`, DAY)" in calls[1] + + +def test_sync_grants_config(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): + adapter = make_mocked_engine_adapter(BigQueryEngineAdapter) + relation = exp.to_table("project.dataset.test_table", dialect="bigquery") + new_grants_config = { + "roles/bigquery.dataViewer": ["user:analyst@example.com", "group:data-team@example.com"], + "roles/bigquery.dataEditor": ["user:admin@example.com"], + } + current_grants = [ + ("roles/bigquery.dataViewer", "user:old_analyst@example.com"), + ("roles/bigquery.admin", "user:old_admin@example.com"), + ] + + fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants) + execute_mock = mocker.patch.object(adapter, "execute") + mocker.patch.object(adapter, "get_current_catalog", return_value="project") + mocker.patch.object(adapter.client, "location", "us-central1") + + mock_dataset = mocker.Mock() + mock_dataset.location = "us-central1" + mocker.patch.object(adapter, "_db_call", return_value=mock_dataset) + + adapter.sync_grants_config(relation, new_grants_config) + + fetchall_mock.assert_called_once() + executed_query = fetchall_mock.call_args[0][0] + executed_sql = executed_query.sql(dialect="bigquery") + expected_sql = ( + "SELECT privilege_type, grantee FROM `project`.`region-us-central1`.`INFORMATION_SCHEMA.OBJECT_PRIVILEGES` AS OBJECT_PRIVILEGES " + "WHERE object_schema = 'dataset' AND object_name = 'test_table' AND SPLIT(grantee, ':')[OFFSET(1)] <> session_user()" + ) + assert executed_sql == expected_sql + + sql_calls = _to_sql_calls(execute_mock) + + assert len(sql_calls) == 4 + assert ( + "REVOKE `roles/bigquery.dataViewer` ON TABLE `project`.`dataset`.`test_table` FROM 'user:old_analyst@example.com'" + in sql_calls + ) + assert ( + "REVOKE `roles/bigquery.admin` ON TABLE `project`.`dataset`.`test_table` FROM 'user:old_admin@example.com'" + in sql_calls + ) + assert ( + "GRANT `roles/bigquery.dataViewer` ON TABLE `project`.`dataset`.`test_table` TO 'user:analyst@example.com', 'group:data-team@example.com'" + in sql_calls + ) + assert ( + "GRANT `roles/bigquery.dataEditor` ON TABLE `project`.`dataset`.`test_table` TO 'user:admin@example.com'" + in sql_calls + ) + + +def test_sync_grants_config_with_overlaps( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + adapter = make_mocked_engine_adapter(BigQueryEngineAdapter) + relation = exp.to_table("project.dataset.test_table", dialect="bigquery") + new_grants_config = { + "roles/bigquery.dataViewer": [ + "user:analyst1@example.com", + "user:analyst2@example.com", + "user:analyst3@example.com", + ], + "roles/bigquery.dataEditor": ["user:analyst2@example.com", "user:editor@example.com"], + } + current_grants = [ + ("roles/bigquery.dataViewer", "user:analyst1@example.com"), # Keep + ("roles/bigquery.dataViewer", "user:old_analyst@example.com"), # Remove + ("roles/bigquery.dataEditor", "user:analyst2@example.com"), # Keep + ("roles/bigquery.admin", "user:admin@example.com"), # Remove + ] + + fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants) + execute_mock = mocker.patch.object(adapter, "execute") + mocker.patch.object(adapter, "get_current_catalog", return_value="project") + mocker.patch.object(adapter.client, "location", "us-central1") + + mock_dataset = mocker.Mock() + mock_dataset.location = "us-central1" + mocker.patch.object(adapter, "_db_call", return_value=mock_dataset) + + adapter.sync_grants_config(relation, new_grants_config) + + fetchall_mock.assert_called_once() + executed_query = fetchall_mock.call_args[0][0] + executed_sql = executed_query.sql(dialect="bigquery") + expected_sql = ( + "SELECT privilege_type, grantee FROM `project`.`region-us-central1`.`INFORMATION_SCHEMA.OBJECT_PRIVILEGES` AS OBJECT_PRIVILEGES " + "WHERE object_schema = 'dataset' AND object_name = 'test_table' AND SPLIT(grantee, ':')[OFFSET(1)] <> session_user()" + ) + assert executed_sql == expected_sql + + sql_calls = _to_sql_calls(execute_mock) + + assert len(sql_calls) == 4 + assert ( + "REVOKE `roles/bigquery.dataViewer` ON TABLE `project`.`dataset`.`test_table` FROM 'user:old_analyst@example.com'" + in sql_calls + ) + assert ( + "REVOKE `roles/bigquery.admin` ON TABLE `project`.`dataset`.`test_table` FROM 'user:admin@example.com'" + in sql_calls + ) + assert ( + "GRANT `roles/bigquery.dataViewer` ON TABLE `project`.`dataset`.`test_table` TO 'user:analyst2@example.com', 'user:analyst3@example.com'" + in sql_calls + ) + assert ( + "GRANT `roles/bigquery.dataEditor` ON TABLE `project`.`dataset`.`test_table` TO 'user:editor@example.com'" + in sql_calls + ) + + +@pytest.mark.parametrize( + "table_type, expected_keyword", + [ + (DataObjectType.TABLE, "TABLE"), + (DataObjectType.VIEW, "VIEW"), + (DataObjectType.MATERIALIZED_VIEW, "TABLE"), + ], +) +def test_sync_grants_config_object_kind( + make_mocked_engine_adapter: t.Callable, + mocker: MockerFixture, + table_type: DataObjectType, + expected_keyword: str, +) -> None: + adapter = make_mocked_engine_adapter(BigQueryEngineAdapter) + relation = exp.to_table("project.dataset.test_object", dialect="bigquery") + + mocker.patch.object(adapter, "fetchall", return_value=[]) + execute_mock = mocker.patch.object(adapter, "execute") + mocker.patch.object(adapter, "get_current_catalog", return_value="project") + mocker.patch.object(adapter.client, "location", "us-central1") + + mock_dataset = mocker.Mock() + mock_dataset.location = "us-central1" + mocker.patch.object(adapter, "_db_call", return_value=mock_dataset) + + adapter.sync_grants_config( + relation, {"roles/bigquery.dataViewer": ["user:test@example.com"]}, table_type + ) + + executed_exprs = execute_mock.call_args[0][0] + sql_calls = [expr.sql(dialect="bigquery") for expr in executed_exprs] + assert sql_calls == [ + f"GRANT `roles/bigquery.dataViewer` ON {expected_keyword} project.dataset.test_object TO 'user:test@example.com'" + ] + + +def test_sync_grants_config_no_schema( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + adapter = make_mocked_engine_adapter(BigQueryEngineAdapter) + relation = exp.to_table("test_table", dialect="bigquery") + new_grants_config = { + "roles/bigquery.dataViewer": ["user:analyst@example.com"], + "roles/bigquery.dataEditor": ["user:editor@example.com"], + } + + with pytest.raises(ValueError, match="Table test_table does not have a schema \\(dataset\\)"): + adapter.sync_grants_config(relation, new_grants_config)