Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions sqlmesh/core/engine_adapter/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pandas as pd

from sqlmesh.core._typing import SchemaName, TableName
from sqlmesh.core.engine_adapter._typing import DCL, GrantsConfig
from sqlmesh.core.engine_adapter.base import QueryOrDF, Query

logger = logging.getLogger(__name__)
Expand All @@ -46,6 +47,7 @@ class RedshiftEngineAdapter(
# Redshift doesn't support comments for VIEWs WITH NO SCHEMA BINDING (which we always use)
COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
SUPPORTS_REPLACE_TABLE = False
SUPPORTS_GRANTS = True

SCHEMA_DIFFER_KWARGS = {
"parameterized_type_defaults": {
Expand Down Expand Up @@ -163,6 +165,96 @@ def _fetch_native_df(
result = [tuple(row) for row in fetcheddata]
return pd.DataFrame(result, columns=columns)

@staticmethod
def _grant_object_kind(table_type: DataObjectType) -> str:
if table_type == DataObjectType.VIEW:
return "VIEW"
if table_type == DataObjectType.MATERIALIZED_VIEW:
return "MATERIALIZED VIEW"
return "TABLE"

def _dcl_grants_config_expr(
self,
dcl_cmd: t.Type[DCL],
table: exp.Table,
grant_config: GrantsConfig,
table_type: DataObjectType = DataObjectType.TABLE,
) -> t.List[exp.Expression]:
expressions: t.List[exp.Expression] = []
if not grant_config:
return expressions

object_kind = self._grant_object_kind(table_type)
for privilege, principals in grant_config.items():
if not principals:
continue

args: t.Dict[str, t.Any] = {
"privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
"securable": table.copy(),
"principals": principals,
}

if object_kind:
args["kind"] = exp.Var(this=object_kind)

expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]

return expressions

def _apply_grants_config_expr(
self,
table: exp.Table,
grant_config: GrantsConfig,
table_type: DataObjectType = DataObjectType.TABLE,
) -> t.List[exp.Expression]:
return self._dcl_grants_config_expr(exp.Grant, table, grant_config, table_type)

def _revoke_grants_config_expr(
self,
table: exp.Table,
grant_config: GrantsConfig,
table_type: DataObjectType = DataObjectType.TABLE,
) -> t.List[exp.Expression]:
return self._dcl_grants_config_expr(exp.Revoke, table, grant_config, table_type)

def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
"""Returns current grants for a Redshift table as a dictionary."""
table_schema = table.db or self.get_current_schema()
table_name = table.name
current_user = exp.func("current_user")

grant_expr = (
exp.select("privilege_type", "grantee")
.from_(exp.table_("table_privileges", db="information_schema"))
.where(
exp.and_(
exp.column("table_schema").eq(exp.Literal.string(table_schema)),
exp.column("table_name").eq(exp.Literal.string(table_name)),
exp.column("grantor").eq(current_user),
exp.column("grantee").neq(current_user),
)
)
)

results = self.fetchall(grant_expr)

grants_dict: GrantsConfig = {}
for privilege_raw, grantee_raw in results:
if privilege_raw is None or grantee_raw is None:
continue

privilege = str(privilege_raw)
grantee = str(grantee_raw)
if not privilege or not grantee:
continue

grants_dict.setdefault(privilege, [])
if grantee not in grants_dict[privilege]:
grants_dict[privilege].append(grantee)

return grants_dict

def _create_table_from_source_queries(
self,
table_name: TableName,
Expand Down
8 changes: 6 additions & 2 deletions tests/core/engine_adapter/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ def _get_create_user_or_role(
self, username: str, password: t.Optional[str] = None
) -> t.Tuple[str, t.Optional[str]]:
password = password or random_id()
if self.dialect == "postgres":
if self.dialect in ["postgres", "redshift"]:
return username, f"CREATE USER \"{username}\" WITH PASSWORD '{password}'"
if self.dialect == "snowflake":
return username, f"CREATE ROLE {username}"
Expand All @@ -777,6 +777,10 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]
self.add_test_suffix(f"test_{role_name}"), dialect=self.dialect
).sql(dialect=self.dialect)
password = random_id()
if self.dialect == "redshift":
password += (
"A" # redshift requires passwords to have at least one uppercase letter
)
user_name = self._create_user_or_role(user_name, password)
created_users.append(user_name)
roles[role_name] = user_name
Expand All @@ -802,7 +806,7 @@ def get_update_privilege(self) -> str:
def _cleanup_user_or_role(self, user_name: str) -> None:
"""Helper function to clean up a PostgreSQL user and all their dependencies."""
try:
if self.dialect == "postgres":
if self.dialect in ["postgres", "redshift"]:
self.engine_adapter.execute(f"""
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
Expand Down
152 changes: 151 additions & 1 deletion tests/core/engine_adapter/test_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlglot import parse_one

from sqlmesh.core.engine_adapter import RedshiftEngineAdapter
from sqlmesh.core.engine_adapter.shared import DataObject
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
from sqlmesh.utils.errors import SQLMeshError
from tests.core.engine_adapter import to_sql_calls

Expand Down Expand Up @@ -83,6 +83,156 @@ def test_varchar_size_workaround(make_mocked_engine_adapter: t.Callable, mocker:
]


def test_sync_grants_config(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
relation = exp.to_table("test_schema.test_table", dialect="redshift")
new_grants_config = {"SELECT": ["user1", "user2"], "INSERT": ["user3"]}

current_grants = [("SELECT", "old_user"), ("UPDATE", "legacy_user")]
fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants)

adapter.sync_grants_config(relation, new_grants_config)

fetchall_mock.assert_called_once()
executed_query = fetchall_mock.call_args[0][0]
executed_sql = executed_query.sql(dialect="redshift")
expected_sql = (
"SELECT privilege_type, grantee FROM information_schema.table_privileges "
"WHERE table_schema = 'test_schema' AND table_name = 'test_table' "
"AND grantor = CURRENT_USER AND grantee <> CURRENT_USER"
)
assert executed_sql == expected_sql

sql_calls = to_sql_calls(adapter)
assert len(sql_calls) == 4
assert 'REVOKE SELECT ON TABLE "test_schema"."test_table" FROM old_user' in sql_calls
assert 'REVOKE UPDATE ON TABLE "test_schema"."test_table" FROM legacy_user' in sql_calls
assert 'GRANT SELECT ON TABLE "test_schema"."test_table" TO user1, user2' in sql_calls
assert 'GRANT INSERT ON TABLE "test_schema"."test_table" TO user3' in sql_calls


def test_sync_grants_config_with_overlaps(
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
):
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
relation = exp.to_table("test_schema.test_table", dialect="redshift")
new_grants_config = {
"SELECT": ["user_shared", "user_new"],
"INSERT": ["user_shared", "user_writer"],
}

current_grants = [
("SELECT", "user_shared"),
("SELECT", "user_legacy"),
("INSERT", "user_shared"),
]
fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants)

adapter.sync_grants_config(relation, new_grants_config)

fetchall_mock.assert_called_once()
executed_query = fetchall_mock.call_args[0][0]
executed_sql = executed_query.sql(dialect="redshift")
expected_sql = (
"SELECT privilege_type, grantee FROM information_schema.table_privileges "
"WHERE table_schema = 'test_schema' AND table_name = 'test_table' "
"AND grantor = CURRENT_USER AND grantee <> CURRENT_USER"
)
assert executed_sql == expected_sql

sql_calls = to_sql_calls(adapter)
assert len(sql_calls) == 3
assert 'REVOKE SELECT ON TABLE "test_schema"."test_table" FROM user_legacy' in sql_calls
assert 'GRANT SELECT ON TABLE "test_schema"."test_table" TO user_new' in sql_calls
assert 'GRANT INSERT ON TABLE "test_schema"."test_table" TO user_writer' in sql_calls


@pytest.mark.parametrize(
"table_type, expected_keyword",
[
(DataObjectType.TABLE, "TABLE"),
(DataObjectType.VIEW, "VIEW"),
(DataObjectType.MATERIALIZED_VIEW, "MATERIALIZED VIEW"),
],
)
def test_sync_grants_config_object_kind(
make_mocked_engine_adapter: t.Callable,
mocker: MockerFixture,
table_type: DataObjectType,
expected_keyword: str,
) -> None:
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
relation = exp.to_table("test_schema.test_object", dialect="redshift")

mocker.patch.object(adapter, "fetchall", return_value=[])

adapter.sync_grants_config(relation, {"SELECT": ["user_test"]}, table_type)

sql_calls = to_sql_calls(adapter)
assert sql_calls == [
f'GRANT SELECT ON {expected_keyword} "test_schema"."test_object" TO user_test'
]


def test_sync_grants_config_quotes(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
relation = exp.to_table('"TestSchema"."TestTable"', dialect="redshift")
new_grants_config = {"SELECT": ["user1", "user2"], "INSERT": ["user3"]}

current_grants = [("SELECT", "user_old"), ("UPDATE", "user_legacy")]
fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants)

adapter.sync_grants_config(relation, new_grants_config)

fetchall_mock.assert_called_once()
executed_query = fetchall_mock.call_args[0][0]
executed_sql = executed_query.sql(dialect="redshift")
expected_sql = (
"SELECT privilege_type, grantee FROM information_schema.table_privileges "
"WHERE table_schema = 'TestSchema' AND table_name = 'TestTable' "
"AND grantor = CURRENT_USER AND grantee <> CURRENT_USER"
)
assert executed_sql == expected_sql

sql_calls = to_sql_calls(adapter)
assert len(sql_calls) == 4
assert 'REVOKE SELECT ON TABLE "TestSchema"."TestTable" FROM user_old' in sql_calls
assert 'REVOKE UPDATE ON TABLE "TestSchema"."TestTable" FROM user_legacy' in sql_calls
assert 'GRANT SELECT ON TABLE "TestSchema"."TestTable" TO user1, user2' in sql_calls
assert 'GRANT INSERT ON TABLE "TestSchema"."TestTable" TO user3' in sql_calls


def test_sync_grants_config_no_schema(
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
):
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
relation = exp.to_table("test_table", dialect="redshift")
new_grants_config = {"SELECT": ["user1"], "INSERT": ["user2"]}

current_grants = [("UPDATE", "user_old")]
fetchall_mock = mocker.patch.object(adapter, "fetchall", return_value=current_grants)
get_schema_mock = mocker.patch.object(adapter, "get_current_schema", return_value="public")

adapter.sync_grants_config(relation, new_grants_config)

get_schema_mock.assert_called_once()

executed_query = fetchall_mock.call_args[0][0]
executed_sql = executed_query.sql(dialect="redshift")
expected_sql = (
"SELECT privilege_type, grantee FROM information_schema.table_privileges "
"WHERE table_schema = 'public' AND table_name = 'test_table' "
"AND grantor = CURRENT_USER AND grantee <> CURRENT_USER"
)
assert executed_sql == expected_sql

sql_calls = to_sql_calls(adapter)
assert len(sql_calls) == 3
assert 'REVOKE UPDATE ON TABLE "test_table" FROM user_old' in sql_calls
assert 'GRANT SELECT ON TABLE "test_table" TO user1' in sql_calls
assert 'GRANT INSERT ON TABLE "test_table" TO user2' in sql_calls


def test_create_table_from_query_exists_no_if_not_exists(
adapter: t.Callable, mocker: MockerFixture
):
Expand Down