Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
# Things to lazy import in form 'name': 'path.to.module'
__lazy_imports = {
'DAG': 'airflow.models.dag',
'Dataset': 'airflow.models.dataset',
'XComArg': 'airflow.models.xcom_arg',
'AirflowException': 'airflow.exceptions',
}
Expand Down
69 changes: 69 additions & 0 deletions airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add Dataset model

Revision ID: 0038cd0c28b4
Revises: 44b7034f6bdc
Create Date: 2022-06-22 14:37:20.880672

"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy import Integer, String

from airflow.migrations.db_types import TIMESTAMP
from airflow.utils.sqlalchemy import ExtendedJSON

revision = '0038cd0c28b4'
down_revision = '44b7034f6bdc'
branch_labels = None
depends_on = None
airflow_version = '2.4.0'


def upgrade():
"""Apply Add Dataset model"""
op.create_table(
'dataset',
sa.Column('id', Integer, primary_key=True, autoincrement=True),
sa.Column(
'uri',
String(length=3000).with_variant(
String(
length=3000,
# latin1 allows for more indexed length in mysql
# and this field should only be ascii chars
collation='latin1_general_cs',
),
'mysql',
),
nullable=False,
),
sa.Column('extra', ExtendedJSON, nullable=True),
sa.Column('created_at', TIMESTAMP, nullable=False),
sa.Column('updated_at', TIMESTAMP, nullable=False),
sqlite_autoincrement=True, # ensures PK values not reused
)
op.create_index('idx_uri_unique', 'dataset', ['uri'], unique=True)


def downgrade():
"""Unapply Add Dataset model"""
op.drop_table('dataset')
2 changes: 2 additions & 0 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from airflow.models.dagpickle import DagPickle
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.dataset import Dataset
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ImportError
from airflow.models.log import Log
Expand Down Expand Up @@ -58,6 +59,7 @@
"DagPickle",
"DagRun",
"DagTag",
"Dataset",
"DbCallbackRequest",
"ImportError",
"Log",
Expand Down
75 changes: 75 additions & 0 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from urllib.parse import urlparse

from sqlalchemy import Column, Index, Integer, String

from airflow.models.base import Base
from airflow.utils import timezone
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime


class Dataset(Base):
"""
A table to store datasets.

:param uri: a string that uniquely identifies the dataset
:param extra: JSON field for arbitrary extra info
"""

id = Column(Integer, primary_key=True, autoincrement=True)
uri = Column(
String(length=3000).with_variant(
String(
length=3000,
# latin1 allows for more indexed length in mysql
# and this field should only be ascii chars
collation='latin1_general_cs',
),
'mysql',
),
nullable=False,
)
Comment on lines +46 to +47
Copy link
Member

@kaxil kaxil Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
nullable=False,
)
nullable=False,
unique=True,
)

Would be good to explicitly have unique here too since you are passing the same via Index on L54

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i thought that we manage uniqueness just through the manually-created index, so that we can give it a name (so that we can manually deal with it in migrations if necessary) -- i don't know what happens if we also add unique keyword here -- will it create the constraint without our provided name? @ephraimbuddy you know what we should do here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having unique=True on the column would create an implicit unnamed constraint. And, I feel the named index with unique=True would create an index as well as an unnamed unique constraint in some databases like MSSQL but I'm not sure. I will verify this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It works fine in MSSQL and postgres. It didn't create extra unique constraint. Just a unique index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so does it result in the creation of two indexes? or just one? cus we don't want to create two of them.

extra = Column(ExtendedJSON, nullable=True)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

__tablename__ = "dataset"
__table_args__ = (
Index('idx_uri_unique', uri, unique=True),
{'sqlite_autoincrement': True}, # ensures PK values not reused
)

def __init__(self, uri: str, **kwargs):
try:
uri.encode('ascii')
except UnicodeEncodeError:
raise ValueError('URI must be ascii')
parsed = urlparse(uri)
if parsed.scheme and parsed.scheme.lower() == 'airflow':
raise ValueError("Scheme `airflow` is reserved.")
super().__init__(uri=uri, **kwargs)

def __eq__(self, other):
return self.uri == other.uri

def __hash__(self):
return hash(self.uri)

def __repr__(self):
return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})"
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
| ``44b7034f6bdc`` (head) | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB |
| ``0038cd0c28b4`` (head) | ``44b7034f6bdc`` | ``2.4.0`` | Add Dataset model |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``44b7034f6bdc`` | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``424117c37d18`` | ``f5fcbda3e651`` | ``2.4.0`` | Add DagWarning model |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
Expand Down
3 changes: 3 additions & 0 deletions tests/utils/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self):
# Ignore flask-session table/index
lambda t: (t[0] == 'remove_table' and t[1].name == 'session'),
lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'),
# sqlite sequence is used for autoincrementing columns created with `sqlite_autoincrement` option
lambda t: (t[0] == 'remove_table' and t[1].name == 'sqlite_sequence'),
]

for ignore in ignores:
diff = [d for d in diff if not ignore(d)]

Expand Down
3 changes: 2 additions & 1 deletion tests/utils/test_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,9 @@ def test_no_models_missing(self):
all_models.update({class_.__tablename__: class_})
exclusion_list = {
'variable', # leave alone
'dataset', # not good way to know if "stale"
'trigger', # self-maintaining
'task_map', # TODO: add datetime column to TaskMap so we can include it here
'task_map', # keys to TI, so no need
'serialized_dag', # handled through FK to Dag
'log_template', # not a significant source of data; age not indicative of staleness
'dag_tag', # not a significant source of data; age not indicative of staleness,
Expand Down