From 776b4dca919bf01cdc9d08dad2ea8b80b897f2ff Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Thu, 25 Sep 2025 09:31:10 -0700 Subject: [PATCH] feat: redshift grant support --- sqlmesh/core/engine_adapter/redshift.py | 92 +++++++++++ .../engine_adapter/integration/__init__.py | 8 +- tests/core/engine_adapter/test_redshift.py | 152 +++++++++++++++++- 3 files changed, 249 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/engine_adapter/redshift.py b/sqlmesh/core/engine_adapter/redshift.py index 7979268473..5c23b4b8e6 100644 --- a/sqlmesh/core/engine_adapter/redshift.py +++ b/sqlmesh/core/engine_adapter/redshift.py @@ -28,6 +28,7 @@ import pandas as pd from sqlmesh.core._typing import SchemaName, TableName + from sqlmesh.core.engine_adapter._typing import DCL, GrantsConfig from sqlmesh.core.engine_adapter.base import QueryOrDF, Query logger = logging.getLogger(__name__) @@ -46,6 +47,7 @@ class RedshiftEngineAdapter( # Redshift doesn't support comments for VIEWs WITH NO SCHEMA BINDING (which we always use) COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED SUPPORTS_REPLACE_TABLE = False + SUPPORTS_GRANTS = True SCHEMA_DIFFER_KWARGS = { "parameterized_type_defaults": { @@ -163,6 +165,96 @@ def _fetch_native_df( result = [tuple(row) for row in fetcheddata] return pd.DataFrame(result, columns=columns) + @staticmethod + def _grant_object_kind(table_type: DataObjectType) -> str: + if table_type == DataObjectType.VIEW: + return "VIEW" + if table_type == DataObjectType.MATERIALIZED_VIEW: + return "MATERIALIZED VIEW" + return "TABLE" + + def _dcl_grants_config_expr( + self, + dcl_cmd: t.Type[DCL], + table: exp.Table, + grant_config: GrantsConfig, + table_type: DataObjectType = DataObjectType.TABLE, + ) -> t.List[exp.Expression]: + expressions: t.List[exp.Expression] = [] + if not grant_config: + return expressions + + object_kind = self._grant_object_kind(table_type) + for privilege, principals in grant_config.items(): + if not principals: + continue + + args: t.Dict[str, t.Any] = { + "privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))], + "securable": table.copy(), + "principals": principals, + } + + if object_kind: + args["kind"] = exp.Var(this=object_kind) + + expressions.append(dcl_cmd(**args)) # type: ignore[arg-type] + + return expressions + + def _apply_grants_config_expr( + self, + table: exp.Table, + grant_config: GrantsConfig, + table_type: DataObjectType = DataObjectType.TABLE, + ) -> t.List[exp.Expression]: + return self._dcl_grants_config_expr(exp.Grant, table, grant_config, table_type) + + def _revoke_grants_config_expr( + self, + table: exp.Table, + grant_config: GrantsConfig, + table_type: DataObjectType = DataObjectType.TABLE, + ) -> t.List[exp.Expression]: + return self._dcl_grants_config_expr(exp.Revoke, table, grant_config, table_type) + + def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig: + """Returns current grants for a Redshift table as a dictionary.""" + table_schema = table.db or self.get_current_schema() + table_name = table.name + current_user = exp.func("current_user") + + grant_expr = ( + exp.select("privilege_type", "grantee") + .from_(exp.table_("table_privileges", db="information_schema")) + .where( + exp.and_( + exp.column("table_schema").eq(exp.Literal.string(table_schema)), + exp.column("table_name").eq(exp.Literal.string(table_name)), + exp.column("grantor").eq(current_user), + exp.column("grantee").neq(current_user), + ) + ) + ) + + results = self.fetchall(grant_expr) + + grants_dict: GrantsConfig = {} + for privilege_raw, grantee_raw in results: + if privilege_raw is None or grantee_raw is None: + continue + + privilege = str(privilege_raw) + grantee = str(grantee_raw) + if not privilege or not grantee: + continue + + grants_dict.setdefault(privilege, []) + if grantee not in grants_dict[privilege]: + grants_dict[privilege].append(grantee) + + return grants_dict + def _create_table_from_source_queries( self, table_name: TableName, diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index 0c53edd405..48aa8e35ea 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -750,7 +750,7 @@ def _get_create_user_or_role( self, username: str, password: t.Optional[str] = None ) -> t.Tuple[str, t.Optional[str]]: password = password or random_id() - if self.dialect == "postgres": + if self.dialect in ["postgres", "redshift"]: return username, f"CREATE USER \"{username}\" WITH PASSWORD '{password}'" if self.dialect == "snowflake": return username, f"CREATE ROLE {username}" @@ -777,6 +777,10 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str] self.add_test_suffix(f"test_{role_name}"), dialect=self.dialect ).sql(dialect=self.dialect) password = random_id() + if self.dialect == "redshift": + password += ( + "A" # redshift requires passwords to have at least one uppercase letter + ) user_name = self._create_user_or_role(user_name, password) created_users.append(user_name) roles[role_name] = user_name @@ -802,7 +806,7 @@ def get_update_privilege(self) -> str: def _cleanup_user_or_role(self, user_name: str) -> None: """Helper function to clean up a PostgreSQL user and all their dependencies.""" try: - if self.dialect == "postgres": + if self.dialect in ["postgres", "redshift"]: self.engine_adapter.execute(f""" SELECT pg_terminate_backend(pid) FROM pg_stat_activity diff --git a/tests/core/engine_adapter/test_redshift.py b/tests/core/engine_adapter/test_redshift.py index c5e3dfff17..27a2adb1ea 100644 --- a/tests/core/engine_adapter/test_redshift.py +++ b/tests/core/engine_adapter/test_redshift.py @@ -9,7 +9,7 @@ from sqlglot import parse_one from sqlmesh.core.engine_adapter import RedshiftEngineAdapter -from sqlmesh.core.engine_adapter.shared import DataObject +from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType from sqlmesh.utils.errors import SQLMeshError from tests.core.engine_adapter import to_sql_calls @@ -83,6 +83,156 @@ def test_varchar_size_workaround(make_mocked_engine_adapter: t.Callable, mocker: ] +def test_sync_grants_config(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): + adapter = make_mocked_engine_adapter(RedshiftEngineAdapter) + relation = exp.to_table("test_schema.test_table", dialect="redshift") + new_grants_config = {"SELECT": ["user1", "user2"], "INSERT": ["user3"]} + + current_grants = [("SELECT", "old_user"), ("UPDATE", "legacy_user")] + fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants) + + adapter.sync_grants_config(relation, new_grants_config) + + fetchall_mock.assert_called_once() + executed_query = fetchall_mock.call_args[0][0] + executed_sql = executed_query.sql(dialect="redshift") + expected_sql = ( + "SELECT privilege_type, grantee FROM information_schema.table_privileges " + "WHERE table_schema = 'test_schema' AND table_name = 'test_table' " + "AND grantor = CURRENT_USER AND grantee <> CURRENT_USER" + ) + assert executed_sql == expected_sql + + sql_calls = to_sql_calls(adapter) + assert len(sql_calls) == 4 + assert 'REVOKE SELECT ON TABLE "test_schema"."test_table" FROM old_user' in sql_calls + assert 'REVOKE UPDATE ON TABLE "test_schema"."test_table" FROM legacy_user' in sql_calls + assert 'GRANT SELECT ON TABLE "test_schema"."test_table" TO user1, user2' in sql_calls + assert 'GRANT INSERT ON TABLE "test_schema"."test_table" TO user3' in sql_calls + + +def test_sync_grants_config_with_overlaps( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + adapter = make_mocked_engine_adapter(RedshiftEngineAdapter) + relation = exp.to_table("test_schema.test_table", dialect="redshift") + new_grants_config = { + "SELECT": ["user_shared", "user_new"], + "INSERT": ["user_shared", "user_writer"], + } + + current_grants = [ + ("SELECT", "user_shared"), + ("SELECT", "user_legacy"), + ("INSERT", "user_shared"), + ] + fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants) + + adapter.sync_grants_config(relation, new_grants_config) + + fetchall_mock.assert_called_once() + executed_query = fetchall_mock.call_args[0][0] + executed_sql = executed_query.sql(dialect="redshift") + expected_sql = ( + "SELECT privilege_type, grantee FROM information_schema.table_privileges " + "WHERE table_schema = 'test_schema' AND table_name = 'test_table' " + "AND grantor = CURRENT_USER AND grantee <> CURRENT_USER" + ) + assert executed_sql == expected_sql + + sql_calls = to_sql_calls(adapter) + assert len(sql_calls) == 3 + assert 'REVOKE SELECT ON TABLE "test_schema"."test_table" FROM user_legacy' in sql_calls + assert 'GRANT SELECT ON TABLE "test_schema"."test_table" TO user_new' in sql_calls + assert 'GRANT INSERT ON TABLE "test_schema"."test_table" TO user_writer' in sql_calls + + +@pytest.mark.parametrize( + "table_type, expected_keyword", + [ + (DataObjectType.TABLE, "TABLE"), + (DataObjectType.VIEW, "VIEW"), + (DataObjectType.MATERIALIZED_VIEW, "MATERIALIZED VIEW"), + ], +) +def test_sync_grants_config_object_kind( + make_mocked_engine_adapter: t.Callable, + mocker: MockerFixture, + table_type: DataObjectType, + expected_keyword: str, +) -> None: + adapter = make_mocked_engine_adapter(RedshiftEngineAdapter) + relation = exp.to_table("test_schema.test_object", dialect="redshift") + + mocker.patch.object(adapter, "fetchall", return_value=[]) + + adapter.sync_grants_config(relation, {"SELECT": ["user_test"]}, table_type) + + sql_calls = to_sql_calls(adapter) + assert sql_calls == [ + f'GRANT SELECT ON {expected_keyword} "test_schema"."test_object" TO user_test' + ] + + +def test_sync_grants_config_quotes(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): + adapter = make_mocked_engine_adapter(RedshiftEngineAdapter) + relation = exp.to_table('"TestSchema"."TestTable"', dialect="redshift") + new_grants_config = {"SELECT": ["user1", "user2"], "INSERT": ["user3"]} + + current_grants = [("SELECT", "user_old"), ("UPDATE", "user_legacy")] + fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants) + + adapter.sync_grants_config(relation, new_grants_config) + + fetchall_mock.assert_called_once() + executed_query = fetchall_mock.call_args[0][0] + executed_sql = executed_query.sql(dialect="redshift") + expected_sql = ( + "SELECT privilege_type, grantee FROM information_schema.table_privileges " + "WHERE table_schema = 'TestSchema' AND table_name = 'TestTable' " + "AND grantor = CURRENT_USER AND grantee <> CURRENT_USER" + ) + assert executed_sql == expected_sql + + sql_calls = to_sql_calls(adapter) + assert len(sql_calls) == 4 + assert 'REVOKE SELECT ON TABLE "TestSchema"."TestTable" FROM user_old' in sql_calls + assert 'REVOKE UPDATE ON TABLE "TestSchema"."TestTable" FROM user_legacy' in sql_calls + assert 'GRANT SELECT ON TABLE "TestSchema"."TestTable" TO user1, user2' in sql_calls + assert 'GRANT INSERT ON TABLE "TestSchema"."TestTable" TO user3' in sql_calls + + +def test_sync_grants_config_no_schema( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + adapter = make_mocked_engine_adapter(RedshiftEngineAdapter) + relation = exp.to_table("test_table", dialect="redshift") + new_grants_config = {"SELECT": ["user1"], "INSERT": ["user2"]} + + current_grants = [("UPDATE", "user_old")] + fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants) + get_schema_mock = mocker.patch.object(adapter, "get_current_schema", return_value="public") + + adapter.sync_grants_config(relation, new_grants_config) + + get_schema_mock.assert_called_once() + + executed_query = fetchall_mock.call_args[0][0] + executed_sql = executed_query.sql(dialect="redshift") + expected_sql = ( + "SELECT privilege_type, grantee FROM information_schema.table_privileges " + "WHERE table_schema = 'public' AND table_name = 'test_table' " + "AND grantor = CURRENT_USER AND grantee <> CURRENT_USER" + ) + assert executed_sql == expected_sql + + sql_calls = to_sql_calls(adapter) + assert len(sql_calls) == 3 + assert 'REVOKE UPDATE ON TABLE "test_table" FROM user_old' in sql_calls + assert 'GRANT SELECT ON TABLE "test_table" TO user1' in sql_calls + assert 'GRANT INSERT ON TABLE "test_table" TO user2' in sql_calls + + def test_create_table_from_query_exists_no_if_not_exists( adapter: t.Callable, mocker: MockerFixture ):