From 0af31c26c3c001270e5f1d97d449858a0d8b0914 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 9 Mar 2020 22:00:04 +0000 Subject: [PATCH 1/3] [dataset] columns and metrics API (nested) --- requirements.txt | 2 +- setup.py | 2 +- superset/commands/exceptions.py | 5 + superset/connectors/sqla/models.py | 8 +- superset/dao/__init__.py | 16 +++ superset/dao/base.py | 71 ++++++++++++ superset/datasets/api.py | 58 +++++++++- superset/datasets/commands/create.py | 22 +++- superset/datasets/commands/delete.py | 13 ++- superset/datasets/commands/exceptions.py | 30 +++++ superset/datasets/commands/refresh.py | 61 +++++++++++ superset/datasets/commands/update.py | 17 +++ superset/datasets/dao.py | 116 +++++++++++++------- superset/datasets/schemas.py | 49 ++++++++- superset/views/base_api.py | 1 + tests/dataset_api_tests.py | 133 +++++++++++++++++++++-- 16 files changed, 541 insertions(+), 63 deletions(-) create mode 100644 superset/dao/__init__.py create mode 100644 superset/dao/base.py create mode 100644 superset/datasets/commands/refresh.py diff --git a/requirements.txt b/requirements.txt index 68ef5a35bbd0..81c3485e8ccd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,7 +21,7 @@ croniter==0.3.31 cryptography==2.8 decorator==4.4.1 # via retry defusedxml==0.6.0 # via python3-openid -flask-appbuilder==2.2.4 +flask-appbuilder==2.3.0rc3 flask-babel==1.0.0 # via flask-appbuilder flask-caching==1.8.0 flask-compress==1.4.0 diff --git a/setup.py b/setup.py index 915ee6cbbf2d..7c81e0d0ce6e 100644 --- a/setup.py +++ b/setup.py @@ -76,7 +76,7 @@ def get_git_sha(): "croniter>=0.3.28", "cryptography>=2.4.2", "flask>=1.1.0, <2.0.0", - "flask-appbuilder>=2.2.4, <2.3.0", + "flask-appbuilder==2.3.0rc3", "flask-caching", "flask-compress", "flask-talisman", diff --git a/superset/commands/exceptions.py b/superset/commands/exceptions.py index 83b3e1df4e50..202887678521 100644 --- a/superset/commands/exceptions.py +++ b/superset/commands/exceptions.py @@ -34,6 +34,11 @@ def __init__(self, message: str = "", exception: Optional[Exception] = None): def exception(self): return self._exception + def __repr__(self): + if self._exception: + return self._exception + return self + class CommandInvalidError(CommandException): """ Common base class for Command Invalid errors. """ diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index f249d842a99c..9f3a06037109 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -1061,7 +1061,7 @@ def mutator(df: pd.DataFrame) -> None: def get_sqla_table_object(self) -> Table: return self.database.get_table(self.table_name, schema=self.schema) - def fetch_metadata(self) -> None: + def fetch_metadata(self, commit=True) -> None: """Fetches the metadata for the table and merges it in""" try: table = self.get_sqla_table_object() @@ -1074,7 +1074,6 @@ def fetch_metadata(self) -> None: ).format(self.table_name) ) - M = SqlMetric metrics = [] any_date_col = None db_engine_spec = self.database.db_engine_spec @@ -1111,7 +1110,7 @@ def fetch_metadata(self) -> None: any_date_col = col.name metrics.append( - M( + SqlMetric( metric_name="count", verbose_name="COUNT(*)", metric_type="count", @@ -1122,7 +1121,8 @@ def fetch_metadata(self) -> None: self.main_dttm_col = any_date_col self.add_missing_metrics(metrics) db.session.merge(self) - db.session.commit() + if commit: + db.session.commit() @classmethod def import_obj(cls, i_datasource, import_time=None) -> int: diff --git a/superset/dao/__init__.py b/superset/dao/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/superset/dao/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/superset/dao/base.py b/superset/dao/base.py new file mode 100644 index 000000000000..0852748f54c6 --- /dev/null +++ b/superset/dao/base.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Dict, Optional + +from flask_appbuilder.models.sqla import Model +from sqlalchemy.exc import SQLAlchemyError + +from superset.commands.exceptions import ( + CreateFailedError, + DeleteFailedError, + UpdateFailedError, +) +from superset.extensions import db + + +def generic_create(model_cls: Model, properties: Dict, commit=True) -> Optional[Model]: + """ + Generic for creating models + """ + model = model_cls() + for key, value in properties.items(): + setattr(model, key, value) + try: + db.session.add(model) + if commit: + db.session.commit() + except SQLAlchemyError as e: # pragma: no cover + db.session.rollback() + raise CreateFailedError(exception=e) + return model + + +def generic_update(model: Model, properties: Dict, commit=True) -> Optional[Model]: + """ + Generic update a model + """ + for key, value in properties.items(): + setattr(model, key, value) + try: + db.session.merge(model) + if commit: + db.session.commit() + except SQLAlchemyError as e: # pragma: no cover + db.session.rollback() + raise UpdateFailedError(exception=e) + return model + + +def generic_delete(model: Model, commit=True): + try: + db.session.delete(model) + if commit: + db.session.commit() + except SQLAlchemyError as e: # pragma: no cover + db.session.rollback() + raise DeleteFailedError(exception=e) + return model diff --git a/superset/datasets/api.py b/superset/datasets/api.py index 64821dbfd63c..bbf3beb530c3 100644 --- a/superset/datasets/api.py +++ b/superset/datasets/api.py @@ -30,8 +30,10 @@ DatasetForbiddenError, DatasetInvalidError, DatasetNotFoundError, + DatasetRefreshFailedError, DatasetUpdateFailedError, ) +from superset.datasets.commands.refresh import RefreshDatasetCommand from superset.datasets.commands.update import UpdateDatasetCommand from superset.datasets.schemas import DatasetPostSchema, DatasetPutSchema from superset.views.base import DatasourceFilter @@ -49,10 +51,12 @@ class DatasetRestApi(BaseSupersetModelRestApi): allow_browser_login = True class_permission_name = "TableModelView" - include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | {RouteMethod.RELATED} + include_route_methods = ( + RouteMethod.REST_MODEL_VIEW_CRUD_SET | {RouteMethod.RELATED} | {"refresh"} + ) list_columns = [ - "database_name", + "database.database_name", "changed_by.username", "changed_on", "table_name", @@ -75,6 +79,8 @@ class DatasetRestApi(BaseSupersetModelRestApi): "template_params", "owners.id", "owners.username", + "columns", + "metrics", ] add_model_schema = DatasetPostSchema() edit_model_schema = DatasetPutSchema() @@ -93,6 +99,8 @@ class DatasetRestApi(BaseSupersetModelRestApi): "is_sqllab_view", "template_params", "owners", + "columns", + "metrics", ] openapi_spec_tag = "Datasets" @@ -263,3 +271,49 @@ def delete(self, pk: int) -> Response: # pylint: disable=arguments-differ except DatasetDeleteFailedError as e: logger.error(f"Error deleting model {self.__class__.__name__}: {e}") return self.response_422(message=str(e)) + + @expose("//refresh", methods=["PUT"]) + @protect() + @safe + def refresh(self, pk: int) -> Response: # pylint: disable=invalid-name + """Refresh a Dataset + --- + put: + description: >- + Refreshes and updates columns of a dataset + parameters: + - in: path + schema: + type: integer + name: pk + responses: + 200: + description: Dataset delete + content: + application/json: + schema: + type: object + properties: + message: + type: string + 401: + $ref: '#/components/responses/401' + 403: + $ref: '#/components/responses/403' + 404: + $ref: '#/components/responses/404' + 422: + $ref: '#/components/responses/422' + 500: + $ref: '#/components/responses/500' + """ + try: + RefreshDatasetCommand(g.user, pk).run() + return self.response(200, message="OK") + except DatasetNotFoundError: + return self.response_404() + except DatasetForbiddenError: + return self.response_403() + except DatasetRefreshFailedError as e: + logger.error(f"Error refreshing dataset {self.__class__.__name__}: {e}") + return self.response_422(message=str(e)) diff --git a/superset/datasets/commands/create.py b/superset/datasets/commands/create.py index 344770b951d0..d74cc3685766 100644 --- a/superset/datasets/commands/create.py +++ b/superset/datasets/commands/create.py @@ -19,6 +19,7 @@ from flask_appbuilder.security.sqla.models import User from marshmallow import ValidationError +from sqlalchemy.exc import SQLAlchemyError from superset.commands.base import BaseCommand from superset.commands.exceptions import CreateFailedError @@ -31,6 +32,7 @@ TableNotFoundValidationError, ) from superset.datasets.dao import DatasetDAO +from superset.extensions import db, security_manager logger = logging.getLogger(__name__) @@ -43,9 +45,23 @@ def __init__(self, user: User, data: Dict): def run(self): self.validate() try: - dataset = DatasetDAO.create(self._properties) - except CreateFailedError as e: - logger.exception(e.exception) + # Creates SqlaTable (Dataset) + dataset = DatasetDAO.create(self._properties, commit=False) + # Updates columns and metrics from the dataset + dataset.fetch_metadata(commit=False) + # Add datasource access permission + security_manager.add_permission_view_menu( + "datasource_access", dataset.get_perm() + ) + # Add schema access permission if exists + if dataset.schema: + security_manager.add_permission_view_menu( + "schema_access", dataset.schema_perm + ) + db.session.commit() + except (SQLAlchemyError, CreateFailedError) as e: + logger.exception(e) + db.session.rollback() raise DatasetCreateFailedError() return dataset diff --git a/superset/datasets/commands/delete.py b/superset/datasets/commands/delete.py index d61c56a0e06d..d99a84a9a85f 100644 --- a/superset/datasets/commands/delete.py +++ b/superset/datasets/commands/delete.py @@ -18,6 +18,7 @@ from typing import Optional from flask_appbuilder.security.sqla.models import User +from sqlalchemy.exc import SQLAlchemyError from superset.commands.base import BaseCommand from superset.commands.exceptions import DeleteFailedError @@ -29,6 +30,7 @@ ) from superset.datasets.dao import DatasetDAO from superset.exceptions import SupersetSecurityException +from superset.extensions import db, security_manager from superset.views.base import check_ownership logger = logging.getLogger(__name__) @@ -43,9 +45,14 @@ def __init__(self, user: User, model_id: int): def run(self): self.validate() try: - dataset = DatasetDAO.delete(self._model) - except DeleteFailedError as e: - logger.exception(e.exception) + dataset = DatasetDAO.delete(self._model, commit=False) + security_manager.del_permission_view_menu( + "datasource_access", dataset.get_perm() + ) + db.session.commit() + except (SQLAlchemyError, DeleteFailedError) as e: + logger.exception(e) + db.session.rollback() raise DatasetDeleteFailedError() return dataset diff --git a/superset/datasets/commands/exceptions.py b/superset/datasets/commands/exceptions.py index a6d0ed7deda3..3034a4ecd1ce 100644 --- a/superset/datasets/commands/exceptions.py +++ b/superset/datasets/commands/exceptions.py @@ -57,6 +57,32 @@ def __init__(self, table_name: str): ) +class DatasetUpdateColumnNotFoundValidationError(ValidationError): + """ + Marshmallow validation error when dataset column for update does not exist + """ + + def __init__(self): + super().__init__( + super().__init__( + _("One or more columns do not exist"), field_names=["columns"] + ) + ) + + +class DatasetUpdateMetricNotFoundValidationError(ValidationError): + """ + Marshmallow validation error when dataset metric for update does not exist + """ + + def __init__(self): + super().__init__( + super().__init__( + _("One or more metrics do not exist"), field_names=["metrics"] + ) + ) + + class TableNotFoundValidationError(ValidationError): """ Marshmallow validation error when a table does not exist on the database @@ -99,5 +125,9 @@ class DatasetDeleteFailedError(DeleteFailedError): message = _("Dataset could not be deleted.") +class DatasetRefreshFailedError(UpdateFailedError): + message = _("Dataset could not be updated.") + + class DatasetForbiddenError(ForbiddenError): message = _("Changing this dataset is forbidden") diff --git a/superset/datasets/commands/refresh.py b/superset/datasets/commands/refresh.py new file mode 100644 index 000000000000..a10c9292789e --- /dev/null +++ b/superset/datasets/commands/refresh.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from typing import Optional + +from flask_appbuilder.security.sqla.models import User + +from superset.commands.base import BaseCommand +from superset.connectors.sqla.models import SqlaTable +from superset.datasets.commands.exceptions import ( + DatasetForbiddenError, + DatasetNotFoundError, + DatasetRefreshFailedError, +) +from superset.datasets.dao import DatasetDAO +from superset.exceptions import SupersetSecurityException +from superset.views.base import check_ownership + +logger = logging.getLogger(__name__) + + +class RefreshDatasetCommand(BaseCommand): + def __init__(self, user: User, model_id: int): + self._actor = user + self._model_id = model_id + self._model: Optional[SqlaTable] = None + + def run(self): + self.validate() + try: + # Updates columns and metrics from the dataset + self._model.fetch_metadata(commit=False) + except Exception as e: + logger.exception(e) + raise DatasetRefreshFailedError() + return self._model + + def validate(self) -> None: + # Validate/populate model exists + self._model = DatasetDAO.find_by_id(self._model_id) + if not self._model: + raise DatasetNotFoundError() + # Check ownership + try: + check_ownership(self._model) + except SupersetSecurityException: + raise DatasetForbiddenError() diff --git a/superset/datasets/commands/update.py b/superset/datasets/commands/update.py index b3deeab2ebd9..3c7aba190548 100644 --- a/superset/datasets/commands/update.py +++ b/superset/datasets/commands/update.py @@ -30,7 +30,9 @@ DatasetForbiddenError, DatasetInvalidError, DatasetNotFoundError, + DatasetUpdateColumnNotFoundValidationError, DatasetUpdateFailedError, + DatasetUpdateMetricNotFoundValidationError, ) from superset.datasets.dao import DatasetDAO from superset.exceptions import SupersetSecurityException @@ -84,6 +86,21 @@ def validate(self) -> None: self._properties["owners"] = owners except ValidationError as e: exceptions.append(e) + + # Validate if columns for update exist + columns = self._properties.get("columns") + if columns: + columns_ids = [column.get("id") for column in columns if "id" in column] + if not DatasetDAO.validate_columns_exist(self._model_id, columns_ids): + exceptions.append(DatasetUpdateColumnNotFoundValidationError()) + + # Validate if metrics for update exist + metrics = self._properties.get("metrics") + if metrics: + metrics_ids = [metric.get("id") for metric in metrics if "id" in metric] + if not DatasetDAO.validate_metrics_exist(self._model_id, metrics_ids): + exceptions.append(DatasetUpdateMetricNotFoundValidationError()) + if exceptions: exception = DatasetInvalidError() exception.add_list(exceptions) diff --git a/superset/datasets/dao.py b/superset/datasets/dao.py index 7e08ce8c0c99..916a1230ac61 100644 --- a/superset/datasets/dao.py +++ b/superset/datasets/dao.py @@ -15,18 +15,14 @@ # specific language governing permissions and limitations # under the License. import logging -from typing import Dict, Optional +from typing import Dict, List, Optional from flask import current_app from flask_appbuilder.models.sqla.interface import SQLAInterface from sqlalchemy.exc import SQLAlchemyError -from superset.commands.exceptions import ( - CreateFailedError, - DeleteFailedError, - UpdateFailedError, -) -from superset.connectors.sqla.models import SqlaTable +from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn +from superset.dao.base import generic_create, generic_delete, generic_update from superset.extensions import db from superset.models.core import Database from superset.views.base import DatasourceFilter @@ -78,6 +74,24 @@ def validate_update_uniqueness( ) return not db.session.query(dataset_query.exists()).scalar() + @staticmethod + def validate_columns_exist(dataset_id: int, columns_ids: List[int]) -> bool: + dataset_query = ( + db.session.query(TableColumn.id).filter( + TableColumn.table_id == dataset_id, TableColumn.id.in_(columns_ids) + ) + ).all() + return len(columns_ids) == len(dataset_query) + + @staticmethod + def validate_metrics_exist(dataset_id: int, metrics_ids: List[int]) -> bool: + dataset_query = ( + db.session.query(SqlMetric.id).filter( + SqlMetric.table_id == dataset_id, SqlMetric.id.in_(metrics_ids) + ) + ).all() + return len(metrics_ids) == len(dataset_query) + @staticmethod def find_by_id(model_id: int) -> SqlaTable: data_model = SQLAInterface(SqlaTable, db.session) @@ -87,39 +101,67 @@ def find_by_id(model_id: int) -> SqlaTable: @staticmethod def create(properties: Dict, commit=True) -> Optional[SqlaTable]: - model = SqlaTable() - for key, value in properties.items(): - setattr(model, key, value) - try: - db.session.add(model) - if commit: - db.session.commit() - except SQLAlchemyError as e: # pragma: no cover - db.session.rollback() - raise CreateFailedError(exception=e) - return model + return generic_create(SqlaTable, properties, commit=commit) @staticmethod def update(model: SqlaTable, properties: Dict, commit=True) -> Optional[SqlaTable]: - for key, value in properties.items(): - setattr(model, key, value) - try: - db.session.merge(model) - if commit: - db.session.commit() - except SQLAlchemyError as e: # pragma: no cover - db.session.rollback() - raise UpdateFailedError(exception=e) - return model + """ + Updates a Dataset model on the metadata DB + """ + if "columns" in properties: + new_columns = list() + for column in properties.get("columns", []): + if column.get("id"): + column_obj = db.session.query(TableColumn).get(column.get("id")) + column_obj = DatasetDAO.update_column( + column_obj, column, commit=commit + ) + else: + column_obj = DatasetDAO.create_column(column, commit=commit) + new_columns.append(column_obj) + properties["columns"] = new_columns + + if "metrics" in properties: + new_metrics = list() + for metric in properties.get("metrics", []): + if metric.get("id"): + metric_obj = db.session.query(SqlMetric).get(metric.get("id")) + metric_obj = DatasetDAO.update_metric( + metric_obj, metric, commit=commit + ) + else: + metric_obj = DatasetDAO.create_metric(metric, commit=commit) + new_metrics.append(metric_obj) + properties["metrics"] = new_metrics + + return generic_update(model, properties, commit=commit) @staticmethod def delete(model: SqlaTable, commit=True): - try: - db.session.delete(model) - if commit: - db.session.commit() - except SQLAlchemyError as e: # pragma: no cover - logger.error(f"Failed to delete dataset: {e}") - db.session.rollback() - raise DeleteFailedError(exception=e) - return model + return generic_delete(model, commit=commit) + + @staticmethod + def update_column( + model: TableColumn, properties: Dict, commit=True + ) -> Optional[TableColumn]: + return generic_update(model, properties, commit=commit) + + @staticmethod + def create_column(properties: Dict, commit=True) -> Optional[TableColumn]: + """ + Creates a Dataset model on the metadata DB + """ + return generic_create(TableColumn, properties, commit=commit) + + @staticmethod + def update_metric( + model: SqlMetric, properties: Dict, commit=True + ) -> Optional[SqlMetric]: + return generic_update(model, properties, commit=commit) + + @staticmethod + def create_metric(properties: Dict, commit=True) -> Optional[SqlMetric]: + """ + Creates a Dataset model on the metadata DB + """ + return generic_create(SqlMetric, properties, commit=commit) diff --git a/superset/datasets/schemas.py b/superset/datasets/schemas.py index 370550da619c..6d03230adf23 100644 --- a/superset/datasets/schemas.py +++ b/superset/datasets/schemas.py @@ -14,11 +14,54 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import re -from marshmallow import fields, Schema +from flask_babel import lazy_gettext as _ +from marshmallow import fields, Schema, ValidationError from marshmallow.validate import Length +def validate_python_date_format(value): + regex = re.compile( + r""" + ^( + epoch_s|epoch_ms| + (?P%Y(-%m(-%d)?)?)([\sT](?P