From 0930e25ec04897d05fd198558d8db464568af055 Mon Sep 17 00:00:00 2001 From: Igor Shmulyan Date: Wed, 5 Nov 2025 15:11:03 +0200 Subject: [PATCH 1/2] feat(db): add dynamic schema support for athena --- superset/db_engine_specs/athena.py | 38 +++++++++++++ .../unit_tests/db_engine_specs/test_athena.py | 57 +++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/superset/db_engine_specs/athena.py b/superset/db_engine_specs/athena.py index a3abfdf10971..c77bd937f2f6 100644 --- a/superset/db_engine_specs/athena.py +++ b/superset/db_engine_specs/athena.py @@ -21,6 +21,7 @@ from flask_babel import gettext as __ from sqlalchemy import types +from sqlalchemy.engine.url import URL from superset.constants import TimeGrain from superset.db_engine_specs.base import BaseEngineSpec @@ -38,6 +39,7 @@ class AthenaEngineSpec(BaseEngineSpec): disable_ssh_tunneling = True # Athena doesn't support IS true/false syntax, use = true/false instead use_equality_for_boolean_filters = True + supports_dynamic_schema = True _time_grain_expressions = { None: "{col}", @@ -92,3 +94,39 @@ def _mutate_label(label: str) -> str: :return: Conditionally mutated label """ return label.lower() + + @classmethod + def adjust_engine_params( + cls, + uri: URL, + connect_args: dict[str, Any], + catalog: str | None = None, + schema: str | None = None, + ) -> tuple[URL, dict[str, Any]]: + """ + Adjust the SQLAlchemy URI for Athena with a provided schema. + + For AWS Athena the SQLAlchemy URI looks like this: + + awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&... + """ + if not schema: + return uri, connect_args + + uri = uri.set(database=schema) + return uri, connect_args + + @classmethod + def get_schema_from_engine_params( + cls, + sqlalchemy_uri: URL, + connect_args: dict[str, Any], + ) -> str | None: + """ + Return the configured schema. + + For AWS Athena the SQLAlchemy URI looks like this: + + awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&... + """ + return sqlalchemy_uri.database diff --git a/tests/unit_tests/db_engine_specs/test_athena.py b/tests/unit_tests/db_engine_specs/test_athena.py index 9e571eceb216..a1181674a0fe 100644 --- a/tests/unit_tests/db_engine_specs/test_athena.py +++ b/tests/unit_tests/db_engine_specs/test_athena.py @@ -20,6 +20,7 @@ from typing import Optional import pytest +from sqlalchemy.engine.url import make_url from superset.errors import ErrorLevel, SupersetError, SupersetErrorType from tests.unit_tests.db_engine_specs.utils import assert_convert_dttm @@ -120,3 +121,59 @@ def test_handle_boolean_filter() -> None: str(result_false.compile(compile_kwargs={"literal_binds": True})) == "test_col = false" ) + + +def test_adjust_engine_params() -> None: + """ + Test `adjust_engine_params`. + + The method can be used to adjust the schema dynamically. + """ + from superset.db_engine_specs.athena import AthenaEngineSpec + + url = make_url( + "awsathena+rest://athena.us-east-1.amazonaws.com:443/default?s3_staging_dir=s3%3A%2F%2Fathena-staging" + ) + + uri = AthenaEngineSpec.adjust_engine_params(url, {})[0] + assert ( + str(uri) + == "awsathena+rest://athena.us-east-1.amazonaws.com:443/default?s3_staging_dir=s3%3A%2F%2Fathena-staging" + ) + + uri = AthenaEngineSpec.adjust_engine_params( + url, + {}, + schema="new_schema", + )[0] + assert ( + str(uri) + == "awsathena+rest://athena.us-east-1.amazonaws.com:443/new_schema?s3_staging_dir=s3%3A%2F%2Fathena-staging" + ) + + +def test_get_schema_from_engine_params() -> None: + """ + Test the ``get_schema_from_engine_params`` method. + """ + from superset.db_engine_specs.athena import AthenaEngineSpec + + assert ( + AthenaEngineSpec.get_schema_from_engine_params( + make_url( + "awsathena+rest://athena.us-east-1.amazonaws.com:443/default?s3_staging_dir=s3%3A%2F%2Fathena-staging" + ), + {}, + ) + == "default" + ) + + assert ( + AthenaEngineSpec.get_schema_from_engine_params( + make_url( + "awsathena+rest://athena.us-east-1.amazonaws.com:443?s3_staging_dir=s3%3A%2F%2Fathena-staging" + ), + {}, + ) + is None + ) From 6848dd4e05826830c3b104d9a69b29f1193407b0 Mon Sep 17 00:00:00 2001 From: Igor Shmulyan Date: Wed, 5 Nov 2025 18:05:05 +0200 Subject: [PATCH 2/2] consider catalog parameter in URI adjustment --- superset/db_engine_specs/athena.py | 14 ++++++----- .../unit_tests/db_engine_specs/test_athena.py | 23 +++++++++++++++---- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/superset/db_engine_specs/athena.py b/superset/db_engine_specs/athena.py index c77bd937f2f6..002790da0ef6 100644 --- a/superset/db_engine_specs/athena.py +++ b/superset/db_engine_specs/athena.py @@ -104,16 +104,18 @@ def adjust_engine_params( schema: str | None = None, ) -> tuple[URL, dict[str, Any]]: """ - Adjust the SQLAlchemy URI for Athena with a provided schema. + Adjust the SQLAlchemy URI for Athena with a provided catalog and schema. For AWS Athena the SQLAlchemy URI looks like this: - awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&... + awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?catalog_name={catalog_name}&s3_staging_dir={s3_staging_dir} """ - if not schema: - return uri, connect_args + if catalog: + uri = uri.update_query_dict({"catalog_name": catalog}) + + if schema: + uri = uri.set(database=schema) - uri = uri.set(database=schema) return uri, connect_args @classmethod @@ -127,6 +129,6 @@ def get_schema_from_engine_params( For AWS Athena the SQLAlchemy URI looks like this: - awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&... + awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?catalog_name={catalog_name}&s3_staging_dir={s3_staging_dir} """ return sqlalchemy_uri.database diff --git a/tests/unit_tests/db_engine_specs/test_athena.py b/tests/unit_tests/db_engine_specs/test_athena.py index a1181674a0fe..d205572d2cb3 100644 --- a/tests/unit_tests/db_engine_specs/test_athena.py +++ b/tests/unit_tests/db_engine_specs/test_athena.py @@ -131,24 +131,37 @@ def test_adjust_engine_params() -> None: """ from superset.db_engine_specs.athena import AthenaEngineSpec - url = make_url( - "awsathena+rest://athena.us-east-1.amazonaws.com:443/default?s3_staging_dir=s3%3A%2F%2Fathena-staging" - ) + url = make_url("awsathena+rest://athena.us-east-1.amazonaws.com:443/default") uri = AthenaEngineSpec.adjust_engine_params(url, {})[0] + assert str(uri) == "awsathena+rest://athena.us-east-1.amazonaws.com:443/default" + + uri = AthenaEngineSpec.adjust_engine_params( + url, + {}, + schema="new_schema", + )[0] + assert str(uri) == "awsathena+rest://athena.us-east-1.amazonaws.com:443/new_schema" + + uri = AthenaEngineSpec.adjust_engine_params( + url, + {}, + catalog="new_catalog", + )[0] assert ( str(uri) - == "awsathena+rest://athena.us-east-1.amazonaws.com:443/default?s3_staging_dir=s3%3A%2F%2Fathena-staging" + == "awsathena+rest://athena.us-east-1.amazonaws.com:443/default?catalog_name=new_catalog" ) uri = AthenaEngineSpec.adjust_engine_params( url, {}, + catalog="new_catalog", schema="new_schema", )[0] assert ( str(uri) - == "awsathena+rest://athena.us-east-1.amazonaws.com:443/new_schema?s3_staging_dir=s3%3A%2F%2Fathena-staging" + == "awsathena+rest://athena.us-east-1.amazonaws.com:443/new_schema?catalog_name=new_catalog" )