diff --git a/airflow/__init__.py b/airflow/__init__.py index cbbb03dd1be29..826ffa80f2640 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -57,6 +57,7 @@ # Things to lazy import in form 'name': 'path.to.module' __lazy_imports = { 'DAG': 'airflow.models.dag', + 'Dataset': 'airflow.models.dataset', 'XComArg': 'airflow.models.xcom_arg', 'AirflowException': 'airflow.exceptions', } diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py new file mode 100644 index 0000000000000..838f8780b0e70 --- /dev/null +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add Dataset model + +Revision ID: 0038cd0c28b4 +Revises: 44b7034f6bdc +Create Date: 2022-06-22 14:37:20.880672 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import Integer, String + +from airflow.migrations.db_types import TIMESTAMP +from airflow.utils.sqlalchemy import ExtendedJSON + +revision = '0038cd0c28b4' +down_revision = '44b7034f6bdc' +branch_labels = None +depends_on = None +airflow_version = '2.4.0' + + +def upgrade(): + """Apply Add Dataset model""" + op.create_table( + 'dataset', + sa.Column('id', Integer, primary_key=True, autoincrement=True), + sa.Column( + 'uri', + String(length=3000).with_variant( + String( + length=3000, + # latin1 allows for more indexed length in mysql + # and this field should only be ascii chars + collation='latin1_general_cs', + ), + 'mysql', + ), + nullable=False, + ), + sa.Column('extra', ExtendedJSON, nullable=True), + sa.Column('created_at', TIMESTAMP, nullable=False), + sa.Column('updated_at', TIMESTAMP, nullable=False), + sqlite_autoincrement=True, # ensures PK values not reused + ) + op.create_index('idx_uri_unique', 'dataset', ['uri'], unique=True) + + +def downgrade(): + """Unapply Add Dataset model""" + op.drop_table('dataset') diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 4cb19eecdeda2..ecf14576eef6d 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -26,6 +26,7 @@ from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning +from airflow.models.dataset import Dataset from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.errors import ImportError from airflow.models.log import Log @@ -58,6 +59,7 @@ "DagPickle", "DagRun", "DagTag", + "Dataset", "DbCallbackRequest", "ImportError", "Log", diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py new file mode 100644 index 0000000000000..49f5dfd1f66be --- /dev/null +++ b/airflow/models/dataset.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from urllib.parse import urlparse + +from sqlalchemy import Column, Index, Integer, String + +from airflow.models.base import Base +from airflow.utils import timezone +from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime + + +class Dataset(Base): + """ + A table to store datasets. + + :param uri: a string that uniquely identifies the dataset + :param extra: JSON field for arbitrary extra info + """ + + id = Column(Integer, primary_key=True, autoincrement=True) + uri = Column( + String(length=3000).with_variant( + String( + length=3000, + # latin1 allows for more indexed length in mysql + # and this field should only be ascii chars + collation='latin1_general_cs', + ), + 'mysql', + ), + nullable=False, + ) + extra = Column(ExtendedJSON, nullable=True) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) + + __tablename__ = "dataset" + __table_args__ = ( + Index('idx_uri_unique', uri, unique=True), + {'sqlite_autoincrement': True}, # ensures PK values not reused + ) + + def __init__(self, uri: str, **kwargs): + try: + uri.encode('ascii') + except UnicodeEncodeError: + raise ValueError('URI must be ascii') + parsed = urlparse(uri) + if parsed.scheme and parsed.scheme.lower() == 'airflow': + raise ValueError("Scheme `airflow` is reserved.") + super().__init__(uri=uri, **kwargs) + + def __eq__(self, other): + return self.uri == other.uri + + def __hash__(self): + return hash(self.uri) + + def __repr__(self): + return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})" diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index b45e6c2de79cd..e5a5b13fa6c96 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``44b7034f6bdc`` (head) | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB | +| ``0038cd0c28b4`` (head) | ``44b7034f6bdc`` | ``2.4.0`` | Add Dataset model | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``44b7034f6bdc`` | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``424117c37d18`` | ``f5fcbda3e651`` | ``2.4.0`` | Add DagWarning model | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py index 60511f242862c..b6fe56b2d7795 100644 --- a/tests/utils/test_db.py +++ b/tests/utils/test_db.py @@ -76,7 +76,10 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self): # Ignore flask-session table/index lambda t: (t[0] == 'remove_table' and t[1].name == 'session'), lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'), + # sqlite sequence is used for autoincrementing columns created with `sqlite_autoincrement` option + lambda t: (t[0] == 'remove_table' and t[1].name == 'sqlite_sequence'), ] + for ignore in ignores: diff = [d for d in diff if not ignore(d)] diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index 430e21215d811..5c0554591902d 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -252,8 +252,9 @@ def test_no_models_missing(self): all_models.update({class_.__tablename__: class_}) exclusion_list = { 'variable', # leave alone + 'dataset', # not good way to know if "stale" 'trigger', # self-maintaining - 'task_map', # TODO: add datetime column to TaskMap so we can include it here + 'task_map', # keys to TI, so no need 'serialized_dag', # handled through FK to Dag 'log_template', # not a significant source of data; age not indicative of staleness 'dag_tag', # not a significant source of data; age not indicative of staleness,